入门概述
MQ的产品种类和对比
MQ就是消息中间件。MQ是一种理念,ActiveMQ是MQ的落地产品。不管是哪款消息中间件,都有如下一些技术维度:
kafka
- 编程语言:
scala
。 -
rabbitMQ
编程语言:
erlang
。基于
erlang
语言,不好修改底层,不好查找问题的原因,不建议选用。rocketMQ
编程语言:
java
。-
activeMQ
编程语言:
java
。-
MQ的产生背景
微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例,这些架构会有哪些问题?
1、系统之间接口耦合比较严重
每新增一个下游功能,都要对上游的相关接口进行改造;
- 举个例子:如果系统A要发送数据给系统B和系统C,发送给每个系统的数据可能有差异,因此系统A要对发送给每个系统的数据进行了逐一组装,然后逐一发送;
- 当代码上线后又新增了一个需求:把数据也发送给D,新上了一个D系统也要接受A系统的数据,此时就需要修改A系统,让他感知到D系统的存在,同时把数据处理好再给D。在这个过程你会看到,每接入一个下游系统,都要对系统A进行代码改造,开发联调的效率很低。其整体架构如下:
2、面对大流量并发时容易被冲垮
每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时,容易被冲垮。
举个例子秒杀业务:
- 上游系统发起下单购买操作,我就是下单一个操作
下游系统完成秒杀业务逻辑:(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)
3、等待同步存在性能问题
RPC接口上基本都是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。如下:
解决方案
根据上述的几个问题,在设计系统时可以明确要达到的目标:
1,要做到系统解耦,当新的模块接进来时,可以做到代码改动最小:能够解耦
2,设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮:能削峰
3,强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力:能够异步MQ的作用定义
定义
面向消息的中间件(message-oriented middleware)MOM能够很好的解决以上问题,是指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
- 通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。
大致的过程是这样的:
发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系;尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。
作用
1、采用异步处理模式
- 消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上;
- 消息接收者则订阅或者监听该通道。一条消息可能最终转发给一个或者多个消息接收者,这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。
案例:
假如系统A希望发送一个消息给系统B,让他去处理。但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活了”,接着系统B从MQ里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事儿,与系统A无关。
这样的一种通信方式,就是所谓的“异步”通信方式,对于系统A来说,只需要把消息发送给MQ,然后系统B就会异步的去进行处理了,系统A不需要“同步”的等待系统B处理完成。这样的好处是什么呢?两个字:解耦
2、应用系统之间解耦合
- 发送者和接受者不必了解对方,只需要确认消息。
-
整体架构
ActiveMQ入门
ActiveMQ安装
最重要的功能:实现高可用,高性能,可伸缩,易用和安全的企业级面向消息服务的系统;
- 异步消息的消费和处理;
- 控制消息的消费顺序;
- 可以和Spring或者SpringBoot整合简化代码;
-
步骤
1、官网下载并传输到
linux
服务器上:
2、进行解压缩:
命令:tar -zxvf apache-activemq-5.16.4-bin.tar.gz -C /opt/module/
文件夹介绍
ActiveMQ启动及关闭
普通启动
先进入到bin目录,然后执行启动命令:
./activemq start
【ActiveMQ的默认进行端口是61616
】
1、检查是否启动成功:ps -ef | grep activemq
如果查进程时想要屏蔽grep这个查询命令的进程,可以添加-v
这个参数来进行屏蔽:
命令:ps -ef | grep activemq | grep -v grep
;
2、也可以通过检查端口号是否被占用来判断activemq是否启动:
命令1:netstat -anp|grep 61616
;
命令2:lsof -i:61616
;普通关闭、重启
关闭:先进入到bin目录,然后执行启动命令:
./activemq stop
;
重启:先进入到bin目录,然后执行启动命令:./activemq restart
;带运行日志启动
先进入到bin目录,然后执行启动命令:
./activemq start > /路径/myrunmq.log
;
注意: >
指令和**>>**
指令复习:>
指令:输出重定向,会将原来文件的内容覆盖;>>
指令:追加,不会覆盖原来文件的内容,而是追加到文件的尾部;
日志文件路径可以卸载activemq安装路径里面,日志文件名字可以任意写,但后缀必须是
.log
;ActiveMQ控制台
注意:
61616
是ActiveMQ后台进程端口号,8161
是ActiveMQ前台web访问的地址;- 所以浏览器访问
http://ip:8161/admin/
即可访问ActiveMQ控制台,默认用户名都是admin
; - 新版ActiveMQ修改
apache-activemq-5.xx.xx/conf/jetty.xml
文件中的内容 将127.0.0.1
修改为0.0.0.0
(127.0.0.1
代表只能本机访问,远程访问肯定不行)。
补充:记得要开放端口,这样才能访问到控制台:
- 开发某个端口:
firewall-cmd --add-port=端口号/tcp --permanent
; 关闭某个端口:
firewall-cmd --remove-port=端口号/tcp --permanent
;java连接activeMQ入门案例
JMS编码总体架构
【类似于以前的jdbc】
Destination意为目的地,在activeMQ
中有两种:队列(1对1):点对点的信息领域;
- 主题(1对多):发布订阅信息领域;
导入依赖
<!--activemq所需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!--activemq和spring整合的基础包-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
1对1上手案例-queue队列
消息生产者
示例代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
//消息生产者编码
public class JmsProduce {
public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616";
public static final String QUEUE_NAME="queue_lemon";
//先写死连接地址,tcp协议,以“tcp://”开头,端口号写后端进程端口号
public static void main(String[] args) throws JMSException {
//1、创建连接工厂,根据url地址
//如果用户名密码都是默认的admin的话,连接可以用一个参数的构造方法,
//不然要用三个参数的构造方法,用户名密码url;
ActiveMQConnectionFactory activeMQConnectionFactory=
new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
/*3、创建会话session
参数1:是否开启事务;参数2:签收;*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列queue还是主题topic),参数填队列名字;
Queue queue = session.createQueue(QUEUE_NAME);
Destination destination = session.createQueue(QUEUE_NAME);//也可以直接用目的地类
//5、创建消息的生产者(提供者)
MessageProducer messageProducer = session.createProducer(queue);
//6、通过使用消息生产者生产3条消息发送到MQ的队列里面
for (int i=1;i<4;i++){
//7、创建消息(发送字符串)
TextMessage textMessage =
session.createTextMessage("我发送的第" + i + "条消息");
//8、通过消息生产者把消息发送给MQ
messageProducer.send(textMessage);
}
//关闭资源(正着开启,倒着关闭)
messageProducer.close();
session.close();
connection.close();
System.out.println("连接中断");
}
}
activeMQ控制台
**Number Of Pending Messages**
:等待消费的消息;这个是未出队列的数量,公式=总接收数-总出队列数。**Number Of Consumers**
:消费者数量;消费者端的消费者数量。**Messages Enqueued**
:进队消息数;进队列的总消息量,包括出队列的。这个数只增不减。**Messages Dequeued**
:出队消息数;可以理解为是消费者消费掉的数量。
总结:
- 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
- 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
- 当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
消息消费者
示例代码
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; //消息消费者编码 public class JmsConsumer { public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616"; public static final String QUEUE_NAME="queue_lemon"; //先写死连接地址,tcp协议,以“tcp://”开头,端口号写后端进程端口号 public static void main(String[] args) throws JMSException { //1、创建连接工厂,根据url地址 //如果用户名密码都是默认的admin的话 //连接可以用一个参数的构造方法,不然要用三个参数的构造方法,用户名密码url; ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); /*3、创建会话session;参数1:是否开启事务;参数2:签收;*/ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地(具体是队列queue还是主题topic),参数填队列名字; Queue queue = session.createQueue(QUEUE_NAME); //5、创建消息消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while (true){ TextMessage textMessage= (TextMessage)messageConsumer.receive(); //receive()方法可以写一个参数,参数为最长等待时间,不写一直等待 //发送消息是什么格式,介绍消息就需要转化为什么格式 if(null!=textMessage){ System.out.println(textMessage.getText()); }else {break;} }//6、关闭资源(正着开启,倒着关闭) messageConsumer.close(); session.close(); connection.close(); } }
activeMQ控制台
小细节:
**Number Of Consumers**
的值为1的原因是消息消费者的.receive()
方法没有写参数,这就意味着没有收到消息的话一直死等,并且while(true)
死循环,导致消费者程序一直没有结束,如果.receive()
方法中写有参数的话,如果本次在规定时间内没有收到消息,就不会再等待直接进行下一次循环。接收消息的多种方法
同步阻塞方式
```java /1、同步阻塞方式(receive()) 订阅者或着说接收者调用messageConsumer.receive()来接受消息,在接收到消息之前将一直阻塞 / while (true){ TextMessage textMessage= (TextMessage)messageConsumer.receive(); //receive()方法可以写一个参数,参数为最长等待时间,不写一直等待 //发送消息是什么格式,介绍消息就需要转化为什么格式 if(null!=textMessage){
}else {System.out.println(textMessage.getText());
} } messageConsumer.close(); session.close(); connection.close();break;
<a name="rfETn"></a>
###### 异步非阻塞监听器方式
```java
/*2、通过监听的方式来消费消息,来实现有消息就消费,没消息就不管的效果
*/
messageConsumer.setMessageListener(new MessageListener() {
//设置消息监听器,可以采用匿名内部类的方式
@Override
public void onMessage(Message message) {
if(null!=message&&message instanceof TextMessage){
//如果message不为空且message为TextMessage类的话执行下面内容
TextMessage textMessage= (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
//给消费者一点时间消费完消息,
//不然刚连接上还没开始消费就直接被下面给关了,导致消息没有收到;
messageConsumer.close();
session.close();
connection.close();
消费者3大消费情况
情况1:
首先生产消息,之后启动1号消费者,1号消费者能消费到消息吗?毫无疑问,肯定是能;
情况2:
首先生产消息,先启动1号消费者,再启动2号消费者,2号消费者还能消费到消息吗?
1号消费者把消息都给吃光了,2号消费者不能消费消息了;
情况3:
先启动2个消费者,再生产6条消息,消费情况如何?
一个消费者3条消息,对半分,线程抢资源是时间片分配,产生对半分的现象;
总结
JMS开发的基本步骤
- 创建一个
connection factory
; - 通过
connection factory
来创建JMS connection
; - 启动
JMS connection
; - 通过
JMS connection
创建JMS session
; - 创建
JMS destination
(目的地 队列/主题); - 创建
JMS producer
或者创建JMS consume
并设置destination
; - 创建
JMS consumer
或者注册一个JMS message listener
; - 发送(send)或者接收(receive)
JMS message
; - 关闭所有JMS资源;
两种消费方式
同步阻塞方式(receive)
- 订阅者或接收者抵用
MessageConsumer
的receive()
方法来接收消息,receive
方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器onMessage())
- 订阅者或接收者通过
MessageConsumer
的setMessageListener(MessageListener listener)
注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener
的onMessage(Message message)
方法。点对点消息传递域的特点
- 每个消息只能有一个消费者,类似于1对1的关系。好比个人快递自己领自己的。
- 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们发送短信,发送者发送后不见得接收者即收即看。
- 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
1对多上手案例-Topic主题
特点
- 生产者将消息发布到topic中,每个消息可以有多个消费者,属于
1:N
的关系; - 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
- 生产者生产时,topic不保存消息,它是无状态的,不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
- JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。
发布主题生产者
示例代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer_Topic {
public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616";
public static final String TOPIC_NAME="topic_lemon";
//先写死连接地址,tcp协议,以“tcp://”开头,端口号写后端进程端口号
public static void main(String[] args) throws Exception {
//1、创建连接工厂,根据url地址
//如果用户名密码都是默认的admin的话,
//连接可以用一个参数的构造方法,不然要用三个参数的构造方法,用户名密码url;
ActiveMQConnectionFactory activeMQConnectionFactory=
new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
/*3、创建会话session;参数1:是否开启事务;参数2:签收;*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列queue还是主题topic),参数填主题名字;
Topic topic = session.createTopic(TOPIC_NAME);
//5、创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
System.out.println("1号消费者");
//使用lambda代替匿名内部类,来构建监听器
messageConsumer.setMessageListener(message -> {
if(null!=message&&message instanceof TextMessage){
//如果message不为空且message为TextMessage类的话执行下面内容
TextMessage textMessage= (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//给消费者一点时间消费完消息,
//不然刚连接上还没开始消费就直接被下面给关了,导致消息没有收到;
messageConsumer.close();
session.close();
connection.close();
}
}
activeMQ控制台
注意:先启动订阅者再启动生产者,不然发送的消息是废消息;
同一程序运行多次,使得而产生多个消息消费者;
主题订阅消费者
示例代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_Topic {
public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616";
public static final String TOPIC_NAME="topic_lemon";
//先写死连接地址,tcp协议,以“tcp://”开头,端口号写后端进程端口号
public static void main(String[] args) throws JMSException {
//1、创建连接工厂,根据url地址
//如果用户名密码都是默认的admin的话,
//连接可以用一个参数的构造方法,不然要用三个参数的构造方法,用户名密码url;
ActiveMQConnectionFactory activeMQConnectionFactory=
new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
/*3、创建会话session
参数1:是否开启事务;参数2:签收;*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列queue还是主题topic),参数填主题名字;
Topic topic = session.createTopic(TOPIC_NAME);
//5、创建发布主题的生产者(提供者)
MessageProducer messageProducer = session.createProducer(topic);
//6、通过使用发布主题的生产者生产几条消息发送到MQ的主题里面
for (int i=1;i<7;i++){
//7、创建主题(发送字符串)
TextMessage textMessage = session.createTextMessage(
"《主题》我发送的第" + i + "条消息");
//8、发送给MQ
messageProducer.send(textMessage);
}
//关闭资源(正着开启,倒着关闭)
messageProducer.close();
session.close();
connection.close();
System.out.println("topic发布成功并连接中断");
}
}
activeMQ控制台
topic与queue对比总结
两大模式特征
两大模式比较
比较项目 | Topic模式队列 | Queue模式队列 |
---|---|---|
工作模式 | “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息 | “负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送至其中一个消费者,并且要求消费者ack信息。 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上以文件形式保存,比如ActiveMQ—般保存在SAMQ HOME\\datalkr-storeldata 下面。也可以配置成DB存储。 |
传递完整性 | 如果没有订阅者,消息会被丢弃 | 消息不会丢弃 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的 |
JMS规范和落地产品
JavaEE是什么
JavaEE是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计,开发。装配及部署企业应用程序。
- JDBC(Java Databease)数据库连接
- JNDI(Java Naming and Directory Interfaces)Java的命令和目录接口
- EJB(Enterprise JavaBean)
- RMI(Remote Method Invoke)远程方法调用
- Java IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)接口定义语言/共用对象请求代理程序体系结构
- JSP(Java Server Page)
- Servlet
- XML(Extensible Markup Language)可标记白标记语言
- JMS(Java Message Service)Java消息服务
- JTA(Java Transaction API)Java事务API
- JTS(Java Transaction Service)Java事务服务
- JavaMail
- JAF(JavaBean Activation Framework)
JMS是什么
JMS(Java Message Service)是Java消息服务,是JavaEE中的一个技术;
什么是Java消息服务?
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。MQ中间件的其他落地产品
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
PRODUCER-CUMSUMER | 支持 | 支持 | 支持 | 支持 |
PUBLISH-SUBSCRIBE | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持,Java优先 | 语言无关 | 支持,Java优先 | 支持 |
单机吞吐量 | 万级 | 万级 | 十万级 | 单机万级 |
消息延迟 | -(极低的概率会) | 微秒级 | 毫秒级 | -(极低的概率会) |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | -(极低的概率会) | 低 | 理论上不会丢失 | -(极低的概率会) |
消息重复 | -(极低的概率会) | 可控制 | 理论上会有重复 | -(极低的概率会) |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | -(极低的概率会) | 低 | 中 | 高 |
JMS的组成和特点
JMS组成的四大元素
JMS Provider
实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器;
JMS Producer
JMS Consumer
JMS Message
消息头
JMSDestination
JMSDeliveryMode
持久模式和非持久模式。
- 一条持久性的消息:应该被传送一次仅仅一次,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
JMSExpiration
消息过期时间;
可以设置消息在一定时间后过期,默认是永不过期。
- 消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
- 如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
JMSPriority
消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。JMSMessageID
消息体
设置:
**session.createXXXXMessage**
封装具体的消息数据,有5种消息体格式;
**TxtMessage**
:普通字符串消息,包含一个String;**MapMessage**
:一个Map类型的消息,key为Strng类型,而值为Java基本类型;mapMessage.setString(key,value);
:设置键和值;
**BytesMessage**
:二进制数组消息,包含一个byte[];**StreamMessage**
:Java数据流消息,用标准流操作来顺序填充和读取;**ObjectMessage**
:对象消息,包含一个可序列化的Java对象;
-
消息属性
如果需要除消息字段以外的值,那么可以使用消息属性;
- 作用:识别/去重/重点标注等操作非常有用的方法;
消息属性是:
- 他们是以属性名和属性值对的形式制定的。可以将属性视为消息头的扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。
- 消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
/*消息生产者*/ textMessage.setStringProperty("c01","vip"); textMessage.setIntProperty("vip",8); textMessage.setBooleanProperty("vip8",true); //textMessage.setByteProperty(...); //... /*消息消费者*/ String c01 = textMessage.getStringProperty("c01"); int vip = textMessage.getIntProperty("vip"); boolean vip8 = textMessage.getBooleanProperty("vip8"); //textMessage.setByteProperty(...); //...
JMS的可靠性
PERSISTENT
持久性参数设置说明
非持久
当MQ服务器宕机,消息不存在。messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
持久
当MQ服务器宕机,消息依然存在。messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
queue的持久
操作
设置持久:messageProducer.setDeliveryMode(DeliveryMode._PERSISTENT_);
设置不持久:messageProducer.setDeliveryMode(DeliveryMode._NON_PERSISTENT_);
不设置默认持久;总结
持久化消息:
这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
Topic的持久
主题订阅者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsConsumer_Topic_persist { public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616"; public static final String TOPIC_NAME="topic_lemon"; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); //连接先不启动; connection.setClientID("z3");//设置订阅者id Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "备注"); //设置持久化的订阅者,第一个参数填目的地(队列或者主题),第二个参数填备注,备注不重要 connection.start();//此时连接启动 System.out.println("1号消费者z3"); Message message = topicSubscriber.receive();//等待消息(不写参数一直等) while (null!=message){ TextMessage textMessage= (TextMessage) message; System.out.println("订阅的消息:"+textMessage.getText()); message=topicSubscriber.receive(); //消费完消息之后继续等待消息,达到类似于监听的作用; } session.close(); connection.close(); } }
主题发布者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.UnsupportedEncodingException; public class JmsProduce_Topic_persist { public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616"; public static final String TOPIC_NAME="topic_lemon"; public static void main(String[] args) throws JMSException, UnsupportedEncodingException { ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer messageProducer = session.createProducer(topic); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start();//与queue设置持久化不同,topic是在设置完持久化之后再启动 for (int i=1;i<3;i++){ TextMessage textMessage = session.createTextMessage( "《主题》我发送的第" + i + "条消息"); messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println("topic发布成功并连接中断"); } }
web控制台
注意
订阅者先订阅一次(必须),然后离线,订阅者离线期间发布者发布内容,订阅者再次上线就可以收到消息,这就是主题的持久性,类似属于微信公众号;
订阅者再次上线时,其
connection.setClientID("xxx")
订阅者id,目的地,备注,必须与第一次订阅时一摸一样,这样再次上线时创建出来的订阅者(TopicSubscriber)才是上次的订阅者,才可接收到其离线期间发布者发布的内容;不然创建的是一个新的订阅者,那就是第一次订阅了。Transaction
事务生产者
1、关闭事务:
Session session=connection.createSession(false, Session._AUTO_ACKNOWLEDGE_);
只要执行send,就进入到队列中;
- 关闭事务,那第2个签收参数的设置需要有效;
2、开启事务:Session session=connection.createSession(true, Session._AUTO_ACKNOWLEDGE_);
- 先执行send再执行commit【
session.commit()
】,消息才被真正提交到队列中; - 执行
session.rollback()
,事务回滚; - 消息需要需要批量提交,需要缓冲处理;
消费者
消费者中也可以开启事务:Session session=connection.createSession(true, Session._AUTO_ACKNOWLEDGE_);
①当消费者中没有开启事务时,消费消息之后web控制台可以显示出消息已消费,并且如果消费者再次消费的话就无消息可消费了,这是正常现象;
②当消费者中开启事务后,直接对消息进行消费:
- idea控制台呈现出已经消费的消息:
- 但web控制台显示消息未消费:
- 这就是消息重复现象;消费者如果开启事务,在消费消息之后需要
session.commit();
进行事务提交才能避免消息重复现象;
注意:消费者中如果开启事务的话就不能用消息监听器MessageListener
消费消息,因为消息监听器消费事务是异步的,也就是说程序运行顺序和你写的顺序不一样,就会出现先session.commit();
再消费消息,也就会出现消息重复现象;
Acknowledge签收
非事务模式下消费者签收介绍
Session session = connection.createSession(false, Session.XXX);
Session._AUTO_ACKNOWLEDGE_
:自动签收(默认);Session._CLIENT_ACKNOWLEDGE_
:手动签收;- 客户端调用
textMessage.acknowledge();
对逐条消息进行逐条签收,不然仍会出现消息重复现象;
- 客户端调用
Session._DUPS_OK_ACKNOWLEDGE_
:允许重复签收;Session._SESSION_TRANSACTED_
:事务级,需要第一个参数为true,也就是开启事务,这个参数只是语法上需要,作用不大;有事务模式下消费者签收介绍
已知如果开启事务的话需要
session.commit()
提交事务才能避免消息重复消费;- 如果
Session._CLIENT_ACKNOWLEDGE_
手动签收的话需要textMessage.acknowledge();
对逐条消息进行逐条签收才能避免消息重复消费;开启事务且开启手动签收
- 事务提交
session.commit();
但没有手动签收textMessage.acknowledge();
消息仍然会被消费; - 手动签收
textMessage.acknowledge();
但没有事务提交session.commit();
消息不会被消费; 因为开启事务且开启手动签收的情况下,优先考虑事务,也就是说如果带事务的话一定要
commit
提交,其他无所谓,以事务为主;如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收;
- 队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势;
发布订阅总结
非持久订阅
- 非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。
- 如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
一句话:先订阅注册和MQ保持连接状态才能接受到发布,只给和MQ保持连接状态的订阅者发布消息。
持久订阅
客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息;
非持久订阅状态下,不能恢复或重新派送一个未签收的消息。
-
持久非持久选择
当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅;
ActiveMQ的Broker
介绍
相当于一个ActiveMQ服务器实例(相当于之前我们在Linux上装的activemq)。
- 说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,(就是把activemq嵌到我们自己的java程序里面)
-
配置conf下的配置文件
回忆阳哥之前讲的Redis,不同config配置文件来模拟不同的实例;
- activemq的配置文件是
**activemq.xml**
;
例如:我们把activemq.xml
复制一份为activemq02.xml
,然后按照activemq02.xml
配置文件启动;
命令(在bin目录下):./activemq start xbean:file:/路径/activemq02.xml
嵌入式Broker
- 用ActiveMQ Broker作为独立的消息服务器来构建Java应用。
ActiveMQ也支持在vm虚拟机中通信基于嵌入的broker,能够无缝的集成其他java应用。
pom.xml
<dependencies> <!--activemq所需要的jar包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!--activemq和spring整合的基础包 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <!--jackson依赖--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> </dependencies>
代码
public class EmbedBroker{ public static void main(String[] args) throws Exception{ BrokerService brokerService=new BrokerService(); brokerService.setUseJmx(true); brokerService.addConnector("tcp://localhost:61616"); //这里自己就是消息服务器,所以填本地localhost; brokerService.start(); } }
测试
消息生产者和消息消费者均使用路径
public static final String _ACTIVEMQ_URL_="tcp://localhost:61616";
-
注意
这个只是应急用,真正生产环境用的还是linux上部署的activemq;
整合ActiveMQ
spring整合ActiveMQ
pom.xml
<dependencies> <!-- activemq核心依赖包 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!-- activemq连接池 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.10</version> </dependency> <!-- spring支持jms的包 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.2.1.RELEASE</version> </dependency> <!--spring相关依赖包--> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <!-- Spring核心依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.16</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.16</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>5.3.16</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>4.3.23.RELEASE</version> </dependency> </dependencies>
配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd"> <!--扫描包--> <context:component-scan base-package="com.atguigu.activemq"/> <!--配置生产者--> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <!--真正可以生产Connection的ConnectionFactory,由对应的JMS服务商提供--> <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://124.70.84.192:61616"/> </bean> </property> <!--最大连接数--> <property name="maxConnections" value="100"/> </bean> <!--这个是队列目的地,点对点的Queue--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!--通过构造注入Queue名--> <constructor-arg index="0" value="spring-active-queue"/> </bean> <!--这个是队列目的地, 发布订阅的主题Topic--> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-active-topic"/> </bean> <!--Spring提供的JMS工具类,他可以进行消息发送,接收等--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!--传入连接工厂--> <property name="connectionFactory" ref="jmsFactory"/> <!--传入目的地--> <property name="defaultDestination" ref="destinationQueue"/> <!--消息自动转换器--> <property name="messageConverter"> <bean class= "org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> </beans>
队列queue操作
生产者
@Service public class SpringMQ_Produce { @Autowired private JmsTemplate jmsTemplate; public static void main(String[] args) { ApplicationContext ctx= new ClassPathXmlApplicationContext("applicationContext.xml"); SpringMQ_Produce produce = (SpringMQ_Produce) ctx.getBean("springMQ_Produce"); //这个接口里只有一个方法,可以用lambda表达式; produce.jmsTemplate.send(session -> { TextMessage textMessage = session.createTextMessage("spring和ActiveMQ整合啦!!!"); return textMessage; }); System.out.println("发送完毕;"); } }
消费者
@Service public class SpringMQ_Consumer { @Autowired JmsTemplate jmsTemplate; public static void main(String[] args) { ApplicationContext ctx= new ClassPathXmlApplicationContext("applicationContext.xml"); SpringMQ_Consumer consumer=(SpringMQ_Consumer) ctx.getBean("springMQ_Consumer"); String text= (String) consumer.jmsTemplate.receiveAndConvert(); //观察源码发现.receiveAndConvert()方法已经做了getText()操作了 System.out.println("消费者收到的消息:"+text); } }
主题Topic操作
applicationContext.xml
在配置文件中更改JMS工具类的传入目的地即可,其余生产者和消费者的代码同队列queue一样;
<!--这个是队列目的地, 发布订阅的主题Topic--> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-active-topic"/> </bean> <!--Spring提供的JMS工具类,他可以进行消息发送,接收等--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> ... <!--传入目的地--> <property name="defaultDestination" ref="destinationTopic"/> ... </bean>
监听器配置
上面的方法都是用同步阻塞方式进行监听。
- 这里在spring里面实现消费者不启动,直接通过配置监听完成;
applicationContext.xml
<!--配置监听程序--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer"> <!--传入连接工厂--> <property name="connectionFactory" ref="jmsFactory"/> <!--传入目的地--> <property name="destination" ref="destinationTopic"/> <!--配置监听器--> <property name="messageListener" ref="myMessageListener"/> </bean>
messageListener实现类
@Component public class myMessageListener implements MessageListener { @Override public void onMessage(Message message) { if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
测试
只启动生产者,不启动消费者,控制台可正常消费消息,不会出现废消息;springboot整合ActiveMQ
pom.xml
<!--导入activemq场景--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>2.6.6</version> </dependency>
queue队列
application.yaml
server: port: 7777 spring: activemq: broker-url: tcp://124.70.84.192:61616 user: admin password: admin jms: pub-sub-domain: false #这里选择目的地是queue还是topic(false代表queue,true代表topic, #不写默认false,也就是不写默认队列) myqueue: boot-activemq-queue #自定义queue名称(自定义属性名)
生产者
配置bean
@EnableJms//开启jms适配的注解 @Component public class ConfigBean { @Value("${myqueue}")//与配置文件中的myqueue匹配; private String myQueue; @Bean public Queue queue(){ //注意这里是import javax.jms.Queue; return new ActiveMQQueue(myQueue); } }
固定发送一条消息
创建自定义生产者组件Queue_Produce
@Component public class Queue_Produce { @Autowired private JmsMessagingTemplate template; @Autowired private Queue queue; //注意这里是import javax.jms.Queue; public void produceMsg(){ template.convertAndSend(queue, "UUID:"+ UUID.randomUUID().toString().substring(0,6)); } }
测试
@Autowired private Queue_Produce queue_produce; @Test void text1() {queue_produce.produceMsg();}
生产者间隔投递
新需求:要求每隔3秒向mq推送消息Queue_Produce新增定投方法
//间隔3s定投方法 @Scheduled(fixedDelay = 3000)//单位ms public void produceMsgScheduled(){ template.convertAndSend(queue, new Date().toString()); System.out.println("成功投放1条消息!!!"); }
主程序类增加注解
@EnableScheduling//开启定投注解功能 @SpringBootApplication public class ActivemqSpringbootApplication { public static void main(String[] args) { SpringApplication.run(ActivemqSpringbootApplication.class, args); } }
测试
之后启动主程序类,程序就会每隔3秒向activemq投放一条消息(new Date()现在时间);消费者
自定义组件Queue_Consumer
之后启动主启动类,就相当于开启了消息监听器,可以一直对消息进行监听;@Component public class Queue_Consumer { @JmsListener(destination = "${myqueue}") public void receive(TextMessage textMessage)throws JMSException{ System.out.println("收到消息:"+textMessage.getText()); } }
topic主题
application.yaml
server: port: 6666 spring: activemq: broker-url: tcp://124.70.84.192:61616 user: admin password: admin jms: pub-sub-domain: true #自己定义主题名称 myTopic: boot-activemq-topic
主题生产者
配置类
配置类定义主题的名称@Component public class ConfigBean { @Value("${myTopic}") private String topicName; @Bean public Topic topic(){ return new ActiveMQTopic(topicName); } }
自定义推送主题组件
实现定时推送功能,每隔一秒推送一次@Component public class ConfigBean { @Value("${myTopic}") private String topicName; @Bean public Topic topic(){ return new ActiveMQTempTopic(topicName); } }
主程序类
主程序类添加**@EnableScheduling**
注解;@EnableScheduling @SpringBootApplication public class ActivemqSpringbootTopicProduceApplication { public static void main(String[] args) { SpringApplication.run(ActivemqSpringbootTopicProduceApplication.class, args); } }
主题消费者
Topic_Consumer
之后启动主程序类即可【先启动主题消费者,在启动主题生产者】;@Service public class Topic_Consumer { @JmsListener(destination = "${myTopic}") public void receive(TextMessage textMessage)throws JMSException{ System.out.println(textMessage.getText()); } }
ActiveMQ的传输协议
简介
——>>>官方介绍
TCP:允许客户端远程连接activemq,使用一个TCP的socker网络连接;
NIO:类似于TCP传输协议,但是用了非阻塞型IO,可以提供更高的性能;ActiveMQ默认的传输协议
ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。
协议 | 描述 |
---|---|
TCP | 默认的协议,性能相对可以 |
NIO | 基于TCP协议之上的,进行了扩展和优化,具有更好的扩展性 |
UDP | 性能比TCP更好,但是不具有可靠性 |
SSL | 安全链接 |
HTTP(S) | 基于HTTP或者HTTPS |
VM | VM本身不是协议,当客户端和代理在同一个Java虚拟机(VM)中运行时,他们之间需要通信,但不想占用网络通道,而是直接通信,可以使用该方式 |
其中配置Transport Connector
的文件在ActiveMQ安装目录的conf/activemq.xml
中的<transportConnectors>
标签之内。见下面实际配置:
<!--activemq自带支持的协议-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!--DOS protection, limit concurrent connections to 1000 and frame size to 100MB-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616
?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672
?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613
?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883
?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614
?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如:
- 描述amqp协议的监听端口时,采用的URI描述格式为
amqp://······
; - 描述Stomp协议的监听端口时,采用URI描述格式为
stomp://······
;
唯独在进行openwire协议描述时,URI头却采用的tcp://······
。这是因为ActiveMQ中默认的消息协议就是openwire;
ActiveMQ的传输协议有哪些
1、(TCP)Transmission Control Protocol默认
- 这是默认的Broker配置,TCP的Client监听端口61616;
- 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的协议来序列化成字节流。默认情况下ActiveMQ把wire protocol叫做protocol,它的目的是促使网络上的效率和数据快速交互;
- TCP连接的URI形式如:
**tcp://HostName:port?key1=value1&key2=value2**
,后面的参数是可选的。 - TCP传输的的优点:
- TCP协议传输可靠性高,稳定性强;
- 高效率:字节流方式传递,效率很高;
- 有效性、可用性:应用广泛,支持任何平台;
关于Transport协议的可选配置参数可以参考—>官网;
2、(NIO)New I/O API Protocol
NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
- 适合使用NIO协议的场景:
- 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
- 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
- NIO连接的URI形式:
**nio://hostname:port?key1=value1&key2=value2**
; - 关于Transport协议的可选配置参数可以参考—>>官网
3、AMQP协议
Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。
4、Stomp协议
STOP,Streaming Text Orientation Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。
5、(SSL)Secure Sockets Layer Protocol
安全加固协议,见—->官网
6、MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
7、WS协议(websocket)
NIO案例
修改activemq.xml配置文件
- 如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都讲使用BIO网络IO模型。
- 所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。
如下所示:URI格式头以
nio
开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。<transportConnectors> ...... <!--上面是默认自带的协议--> <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/> </transportConnectors>
生产者及消费者的修改
只需修改URL连接地址即可:
String _ACTIVEMQ_URL_="**nio**://124.70.84.192:**61618**";
NIO加强
上面的是activemq.xml中配置:
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
URI格式以
nio
开头,代表这个端口使用TCP协议为基础的NIO网络模型。- 但是这样的设置方式,只能使这个端口支持Openwire协议。
- 也就是说:
我们怎么能够让这个端口既支持NIO网络模型,又让他支持多个协议呢?
解决:
—->>官网介绍
在conf/activemq.xml
配置中进行以下配置:
- 使用
auto
关键字(auto
可代表activemq出厂带的所有协议); - 使用
+
来为端口设置多种特性;如下:
设置端口号为61618
<transportConnectors>
......
<!--上面是默认自带的协议-->
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61618
?maximumConnections=1000
&wireFormat.maxFrameSize=104857600
&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20
&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50
"/>
</transportConnectors>
格式:<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:port?参数.."/>
activemq控制台:
java代码:
此时链接地址String _ACTIVEMQ_URL_="**tcp**://124.70.84.192:**61618**";
或者String _ACTIVEMQ_URL_="**nio**://124.70.84.192:**61618**";
都可以成功连接activemq;
ActiveMQ的消息存储和持久化
可持久化
MQ的高可用体现在:
- 事务;持久;签收;可持久化;
- 其中(事务,持久,签收)是MQ自带;
- 而(可持久化)可以借助于外力;
—->>官网介绍
activemq主要持久化机制有两个:LevelDB
和KahaDB
面试题
ActiveMQ的持久化机制
就是高可用的保障手段,简单来说就是MQ服务器宕机后,消息不会丢失;
- 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的:
- 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试发送。
- 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
持久化机制
AMQ Mesage Store(了解)
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本,是以前的默认消息存储,现在不用了KahaDB消息存储(默认)
基于日志文件(类似于redis里的aof持久化机制),从ActiveMQ5.4开始默认的持久化插件
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
- 消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
- KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。
- 数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
**db-number.log**
:KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log
。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。**db.data**
:该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。**db.free**
:当问当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID。**db.redo**
:用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。**lock**
:文件锁,表示当前kahadb独写权限的broker;JDBC消息存储
消息基于JDBC存储的,即这些消息有些会真实记录到我们的mysql或oracle数据库;LevelDB消息存储(了解)
这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引;JDBC存储消息操作
1、添加mysql和Druid驱动包
添加mysql和Druid驱动包到MQ服务器的activemq安装位置的apache-activemq-5.16.4/lib/
目录下:2、做jdbc持久化的配置
在activemq.xml
配置文件中将原来kahaDB持久化配置修改为jdbc持久化配置,如下:<persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/>--> <jdbcPersistenceAdapter dataSource="#my-ds" createTablesOnStartup="false"/> </persistenceAdapter>
- dataSource:指定将要引用的持久化数据库的bean名称(my-ds就是bean的id属性值),
#
是引用符,类似于spring中一个bean引用别的bean的ref; - createTablesOnStartup:是否在启动MQ的时候创建数据表,默认值是true,这样每次启动都会去重新创建数据表了,一般是第一次启动的时候设置为true之后改成false。
3、配置数据库连接池
在activemq.xml配置文件中进行配置数据库连接池,注意,位置在</broker>
标签下面,<import resource="jetty.xml"/>
标签上面(数据库名暂定为activemq);</broker> <!-- Enable web consoles, REST and Ajax APIs and demos The web consoles requires by default login, you can disable this in the jetty.xml file Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <bean class="com.alibaba.druid.pool.DruidDataSource" id="my-ds"> <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"></property> <property name="url" value="jdbc:mysql://124.70.84.192:3306/activemq ?relaxAutoCommit=true"></property> <property name="username" value="root"></property> <property name="password" value="12345ssdlh"></property> </bean> <import resource="jetty.xml"/>
4、建库自动生成表
- 手动创建一个数据库(名为上面配置的activemq);
只要上面配置正确无误(注意一定不要有多余空格,仔细检查),并且手动创建了activemq数据库,ActiveMQ再次重新启动时就会自动在activemq数据库中创建三张表:
- ACTIVEMQ_MSGS(Queue和Topic都存在里面):
- ID:自增的数据库主键;
- CONTAINER:消息的Destination;
- MSGID_PROD:消息发送者的主键;
- MSG_SEQ:是发送消息的顺序,
MSGID_PROD**+**MSG_SEQ
可以组成JMS的MessageID; - EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数;
- MSG:消息本体的Java序列化对象的二进制数据;
- PRIORITY:优先级,从0-9,数值越大优先级越高;
- ACTIVEMQ_ACKS(存储持久订阅的信息和最后一个持久订阅接收的消息ID):
- CONTAlINER:消息的Destination;
- SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息;
- CLIENT ID:每个订阅者都必须有一个唯一的客户端ID用以区分;
- SUB_NAME:订阅者名称;
- SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作;
- LAST ACKED ID:记录消费过的消息的ID。
ACTIVEMQ_LOCK:表ACTIVEMQ_LOCK在集群环境下才有用,只有一个Broker可以获取消息,称为
Master Broker
,其他的只能作为备份。只有等待Master Broker
不可用,才可能成为下一个Master Broker
。这个表用于记录哪个Broker是当前的Master Broker
;当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中;
- 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。
- 而且点对点类型中消息一旦被Consumer消费,就从broker持久库中删除;
-
ACTIVEMQ_ACKS(主题topic)
1、先启动消费者订阅再运行生产者,之后两者均离线
2、此时再查看ACTIVEMQ_MSGS表,会一直保存主题发布的内容
3、ACTIVEMQ_MSGS表中,Queue和Topic都存在里面,只不过对于Queue消息消费掉就没了,而对于Topic则会一直保存;8、小总结
如果我们在activemq控制台把刚才的订阅者给删了:
那么ACTIVEMQ_ACKS表中该订阅者信息将被清除:
而ACTIVEMQ_MSGS表里的信息不会被清空:9、开发的坑
在配置关系型数据库作为ActiveMQ的持久化存储方案时,有坑;
1、数据库jar包: 注意把对应版本的数据库jar或者你自己使用的非自带的数据库连接池jar包
2、createTablesOnStartup
属性:
- 默认为true,每次启动activemq都会自动创建表,在第一次启动后,应改为false,避免不必要的损失。
3、下划线坑:
如果报错**java.lang IllegalStateException:BeanFactory not initialized or already closed**
,这是因为您的操作系统的机器名中有“_”符号。请更改机器名并且重启后即可解决问题。
JDBC Message store with ActiveMQ Journal
是什么
- 这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
- ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
- 当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:
生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
配置
<!--之前默认的注掉
<persistenceAdapter>-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
<!-- <jdbcPersistenceAdapter dataSource="#my-ds" createTablesOnStartup="false"/>
</persistenceAdapter>
-->
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#my-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>
这样配置就是高速缓存+mysql,当消息生产者生产消息之后,消息会立马同步到journal缓存中,journal缓存会等7,8分钟左右时间,如果期间没有消费者消费消息,journal缓存才会把消息同步到mysql里面,如果期间消息被消费完毕,那就不会再同步到mysql。
总结
1、持久化消息主要指的是:
- MQ所在服务器宕机了消息不会丢试的机制。
2、持久化机制演变的过程:
从最初的AMQ Message Store
方案到ActiveMQ V4版本推出的High Performance Journal
(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
3、ActiveMQ消息持久化机制有:
AMQ:基于日志文件;
KahaDB: 基于日志文件,从ActiveMQ5.4开始默认使用;
JDBC:基于第三方数据库;
Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。
4、无论使用哪种持久化方式,消息的存储逻辑都是一致的∶
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
高级特性和大厂常考重点
异步投递Async Sends
—->官网
对于一个Slow Consumer(慢消费者),使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送;
- ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
- ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
- 如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer,直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
- 很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步投递:
它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
- 就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加;
此外它不能有效的确保消息的发送成功。在
**userAsyncSend=true**
的情况下客户端需要容忍消息丢失的可能。配置方法
方法1:
... public static final String ACTIVEMQ_URL= "tcp://124.70.84.192:61616?jms.useAsyncSend=true"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); ...
方法2:
... public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); activeMQConnectionFactory.setUseAsyncSend(true); ...
异步消息如何确定发送成功?
异步发送丢失消息的场景是:生产者设置
userAsyncSend=true
,使用producer.send(msg)
持续发送消息。- 如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
- 如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
- 所以,正确的异步发送方法是需要接收回调的。
- 同步发送和异步发送的区别就在此;
- 同步发送等send不阻塞了就表示一定发送成功了;
- 异步发送需要客户端回执并由客户端再判断一次是否发送成功;
这样就能查出那条消息没有发送成功,再进行人工干预重新发送等;public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616"; public static final String QUEUE_NAME="queue_jdbc"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); activeMQConnectionFactory.setUseAsyncSend(true);//开启异步投递 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue); //将MessageProducer类型强转为ActiveMQMessageProducer类型 for (int i=1;i<4;i++){ TextMessage textMessage = session.createTextMessage("我发送的第" + i + "条消息"); textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--atguigu"); //消息头设置一个JMSMessageID String msgid = textMessage.getJMSMessageID(); activeMQMessageProducer.send(textMessage, new AsyncCallback() { //2个形参的send()方法 @Override//消息发送成功了怎样 public void onSuccess() {System.out.println(msgid+":发送成功");} @Override//消息发送失败了怎样 public void onException(JMSException e) { System.out.println(msgid+":发送失败"); } }); } activeMQMessageProducer.close(); session.close(); connection.close(); System.out.println("连接中断"); }
延迟发送和定时投递
—->>官网介绍
即:一组消息延迟多少时间进行投递,每两次之间重复投递的时间间隔还有就是重复投递多少次;
注意:重复投递次数指的是从第二次重复投递往后的次数,即这组消息投递总次数=1+重复投递次数
;四大消息属性
| AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 | | —- | —- | —- | | AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 | | AMQ_SCHEDULED_REPEAT | int | 重复投递次数 | | AMQ_SCHEDULED_CRON | String | Cron表达式 |
案例演示
1、要在activemq.xml中的<broker>
标签中配置schedulerSupport属性为true:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}"
schedulerSupport="true">
2、Java代码里面封装的辅助消息类型:ScheduledMessage
...
//设置延迟投递属性值
long delay=3*1000;//延迟3秒
long period=3*1000;//每4秒延迟投递一次
int repeat=5;//共延迟投递5次
for (int i=1;i<4;i++){
TextMessage textMessage = session.createTextMessage("我发送的第" + i + "条消息");
//设置消息的属性
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
//延迟投递的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
//重复投递的时间间隔
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
//重复投递次数
messageProducer.send(textMessage);
}
...
ActiveMQ消息重试机制
—>>官网介绍
指的就是,假如消费者开启了事务,但消费完了没有提交,就这样在第一次正常消费后又重复消费了6次(总共6+1=7次)这就称为消息重发机制,到第8次的时候这条消息就会被标记为有毒消息Poison ACK,并且不会被再次消费;
面试题
具体哪些情况会引发消息重发
1:Client用了transactions且再session中调用了rollback
2:Client用了transactions且再调用commit之前关闭或者没有commit
3:Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
请说说消息重发时间间隔和重发次数
间隔:1s
次数:6次
有毒消息Poison ACK的理解
有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。
重发机制的参数
- collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15
- maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认值为6;
- maximumRedeliveryDelay:最大传送延迟,只在useExponentialBackOff为true时有效,假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。
- initialRedeliveryDelay:初始重发延迟时间,默认1000L;
- redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L;
- useCollisionAvoidance:启用防止冲突功能,默认false;
- useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false;
- backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。默认是5;
修改重发机制参数
public static final String QUEUE_NAME="queue_redelivery"; public static final String ACTIVEMQ_URL="tcp://124.70.84.192:61616"; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory(ACTIVEMQ_URL); //设置最大重传次数为3次 RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(3); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); Connection connection = activeMQConnectionFactory.createConnection(); ....
死信队列
—->官方介绍
- 一般生产环境中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。
- 核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。
- 假如第三方物流系统故障了无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送时都会遇到对方接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。
如果保证消息不被重复消费?幂等性问题
网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费,解决方法如下。
- 如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
- 准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<
id,message
>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。