- RabbitMQ
- ActiveMQ
- RocketMQ(阿里开源)
- Kafka
- TubeMQ(腾讯开源)
RabboitMQ
搭建RabbitMQ服务器
- 克隆docker-bash:rabbitmq
- 设置ip
./ip-static
ip:192.168.64.140
- 下载rabbitmq镜像
docker pull rabbitmq:management
#或者从 code 下载 rabbit-image.gz
#上传到服务器,然后执行镜像导入
docker load -i rabbit-image.gz
准备配置文件,配置管理员用户名和密码
- 关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
- 重启docker服务
systemctl restart docker
- 添加配置文件
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf
- 新增管理员用户名密码配置
default_user = admin
default_pass = admin
- 启动rabbitmq服务器
docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
rabbitmq:management
- 访问控制台
[http://192.168.64.140:15672](http://192.168.64.140:15672)
,用户名密码为admin
简单消费者和生产者简单模式
简单模式中只有一个生产者和一个消费者,生产者发送消息,消费者接收消息
创建连接
package m1;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: 一拳超人
* @Date: 2021/10/21 17:16
*/
public class Connection {
public static com.rabbitmq.client.Connection connectionInfo() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.140");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
return connectionFactory.newConnection();
}
}
创建生产者
package m1;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ生产者
*
* @Author: 一拳超人
* @Date: 2021/10/21 16:46
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();
//通信通道
Channel channel = connection.createChannel();
/*
2.在服务器中创建队列hello world
参数说明:
1.队列名
2.是否为持久队列
3.是否为排他(独占)队列(不会被多个消费者独占)
4.是否自动删除
5.队列得其他参数属性
*/
channel.queueDeclare("HelloWorld", false, false, false, null);
/*
3.向hello word队列发送消息
参数说明:
1.交换机,空串表示默认交换机
2.队列名
3.消息属性
4.消息正文
*/
channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());
channel.close();
}
}
创建生产者
package m1;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ生产者
*
* @Author: 一拳超人
* @Date: 2021/10/21 16:46
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();
//通信通道
Channel channel = connection.createChannel();
/*
2.在服务器中创建队列hello world
参数说明:
1.队列名
2.是否为持久队列
3.是否为排他(独占)队列(不会被多个消费者独占)
4.是否自动删除
5.队列得其他参数属性
*/
channel.queueDeclare("HelloWorld", false, false, false, null);
/*
3.向hello word队列发送消息
参数说明:
1.交换机,空串表示默认交换机
2.队列名
3.消息属性
4.消息正文
*/
channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());
channel.close();
}
}
工作模式
工作模式由一个生产者多个消费者构成,一个生产者发送消息,多个消费者之间轮询接受消息,且每个消费者获取的消息唯一,多个消费者之间共用一个队列.
合理收发消息
合理收发消息,需要让服务器知道消费者是否处理完消息,
ACK
使用ACK(Ackonw led)发送消息回执,用以向服务器发送通知,告知服务器一条消息已经处理完毕,服务器可以通过ack知道消息是否处理完毕,消费者使用basicConsume
方法接收消息时,第二个参数为是否手动ack,true为自动确认,false为手动确认.
手动确认需要在回调对象中设置消息回执,使用basicAck(DeliveryTag,multiple)
方法.
方法参数:
- DeliveryTag:long类型,回执
- mutiple:布尔类型,表示是否确认全部消息
QOS
qos可以设置每次抓取消息的数量,在抓取的消息处理完成前不会在抓取消息,在手动ack模式下才有效.
设置方式channel.basicQos(1);
参数:prefetchCount – 服务器将传递的最大消息数,如果没有限制则为 0.
package m2;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import m1.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: 一拳超人
* @Date: 2021/10/22 9:37
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接和通信通道
Channel channel = Connection.connectionInfo().createChannel();
//创建队列
channel.queueDeclare("HelloWorld", false, false, false, null);
//回调对象
DeliverCallback deliverCallback = (consumer, message) -> {
String s = new String(message.getBody());
System.out.println("收到" + s);
//处理消息,遍历字符串,遇到"."则暂停一秒,模拟耗时消息
for (int i = 0; i < s.length(); i++) {
if ('.' == s.charAt(i)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理完成");
/*
处理完消息手动发送回执
参数说明:
1.DeliveryTag:long类型,回执标签
2.multiple:布尔类型,是否确认之前的收到的所有消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicQos(1);
/*
消费消息
参数:
1.队列名
2.autoAck,true为自动确认,false为手动确认
*/
channel.basicConsume("HelloWorld", false, deliverCallback, cancelCallback);
}
}
消息持久化
消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在。消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义。
- 队列持久化
- 消息数据持久化
设置持久化队列
channel.queueDeclare("HelloWorld", true, false, false, null);
参数:
- 队列名
- 是否为持久化队列
- 是否为排他队列
- 是否自动删除(服务器将在队列不使用时删除)
将一个队列设置为持久队列只需要将第二个参数设置为true
消息持久化
channel.basicPublish("", "HelloWorld", MessageProperties.PERSISTENT_BASIC, s.getBytes(StandardCharsets.UTF_8));
其中MessageProperties.PERSISTENT_BASIC
为持久化参数
发布订阅模式
将一条消息发送给所有消费者,同一条消息所有的消费者都可以收到.
交换机
- Direct 直连
- Fanout 扇出,扇形
- Topic 主题
Bus配置刷新
- 修改2,3,4,9添加依赖
- Bus
- Rabbitmq
- binder-rabbit
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-bus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
- 09添加依赖actuator
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- 修改09的yml配置
- 暴漏bus-refresh:
- rabbitmq连接配置
server:
port: 6001
spring:
application:
name: config-server
cloud:
config:
server:
git:
uri: https://gitee.com/G318705208/microservice-learning
search-paths: /SpringCloud01/config
username: gqc318705208@163.com
password: Gqc19981120.
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
eureka:
client:
service-url:
defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
management:
endpoints:
web:
exposure:
include: bus-refresh
- 修改2,3,4的yml配置,修改config目录的三个文件并提交
- rabbitmq连接配置
- Bus发送刷新指令,其他模块接受指令并执行刷新操作,RabbitMQ模式为主题模式.
- 链路日志发送至rabbitmq,zipkin接受链路日志,RabbitMQ模式为简单模式
订单削峰
将订单信息发送至RabbitMQ
- 新建配置类用于新建队列
Queue为org.springframework.amqp.core.Queue
包下的类,用于封装队列参数,RabbitMQ的自动配置类会自动发现Queue实例,并根据其中的参数连接服务器创建队列
import org.springframework.amqp.core.Queue;
@Configuration
public class RabbitMQConfig {
/**
* 新建Queue队列,用于封装队列参数
* RabbitMQ的自动配置类会自动发现Queue实例,
* 根据其中的参数连接服务器创建队列
*/
@Bean
public Queue orderQueue() {
return new Queue("orderQueue", true, false, false);
}
}
- 注入AmqpTemplate实例使用此实例将订单信息发送至RabbitMQ
使用amqpTamplate实例的convertAndSend
方法将订单信息发送至消息队列,此方法可以转自动将数据转换为byte[]数组并发送.第一个参数为队列名称,第二个参数为要发送的消息信息
@Service
public class OrderServiceImpl implements OrderService {
/**
* 在RabbitAutoConfiguration中自动创建了AmqpTemplate实例
*/
@Autowired
private AmqpTemplate amqpTemplate;
public String saveOrder(PdOrder pdOrder) throws Exception {
//转换(自动将数据转换为byte[]数组)并发送
amqpTemplate.convertAndSend("orderQueue", pdOrder);
}
从RabbitMQ中取出消息并进行处理
新建消息处理类
@RabbitListener
注解可以自动注册成为消费者,自动开始从队列接收消息,自动将消息发送至@RabbitHandler注解标注方法中@RabbitHandler
用于指定处理消息的方法,配和@RabbitListener
注解使用,且一个类中只能有一个orderService
实例为订单处理接口,用于处理订单信息
@Component
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {
@Autowired
private OrderService orderService;
//@RabbitHandler配和@RabbitListener注解使用,用于指定处理消息的方法,且一个类中只能有一个
@RabbitHandler
public void receive(PdOrder pdOrder) throws Exception {
orderService.saveOrder(pdOrder);
System.out.println("------------订单已保存----------");
}
}
- 处理订单信息
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
PdOrderMapper pdOrderMapper;
@Autowired
PdCartItemMapper pdCartItemMapper;
@Autowired
PdItemMapper pdItemMapper;
@Autowired
PdItemParamItemMapper pdItemParamItemMapper;
@Autowired
PdShippingMapper pdShippingMapper;
@Autowired
PdOrderItemMapper pdOrderItemMapper;
public String saveOrder(PdOrder pdOrder) throws Exception {
String orderId = pdOrder.getOrderId();
PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
pdOrder.setShippingName(pdShipping.getReceiverName());
pdOrder.setShippingCode(pdShipping.getReceiverAddress());
pdOrder.setStatus(1);//
pdOrder.setPaymentType(1);
pdOrder.setPostFee(10D);
pdOrder.setCreateTime(new Date());
double payment = 0;
List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
for (ItemVO itemVO : itemVOs) {
PdOrderItem pdOrderItem = new PdOrderItem();
String id = generateId();
//String id="2";
pdOrderItem.setId(id);
pdOrderItem.setOrderId(orderId);
pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
pdOrderItemMapper.insert(pdOrderItem);
}
pdOrder.setPayment(payment);
pdOrderMapper.insert(pdOrder);
return orderId;
}
}
SpringBoot整合RabbitMQ
创建模块导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 配置RabbitMQ连接配置
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
virtual-host: mq1
简单模式
新建一个HelloWorld队列,此队列为非持久队列,非独占,不自动删除,
new Queue("HelloWorld", false);
等同于new Queue("HelloWorld", false, false, false);
new Queue("HelloWorld")
默认为持久队列,只需要设置一个false,就可以将此队列设置为非持久队列
@Bean
public Queue HelloWorld() {
return new Queue("HelloWorld", false);
}
创建消息发送者
@Component
public class Provider {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
amqpTemplate.convertAndSend("HelloWorld", "HelloWorld");
}
}
创建消息消费者
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = "HelloWorld")
public void receive(String str) {
log.info("收到消息:{}", str);
System.out.println(str);
}
}
工作模式(资源竞争)
创建队列:
@Bean
public Queue HelloWorld() {
return new Queue("task_queue");
}
创建生产者:
@Component
public class Provider {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
while (true){
System.out.print("请输入消息:");
String s = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("task_queue", s);
}
}
}
创建两个消费者:
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = "task_queue")
public void receive1(String str) {
log.info("----消费者1收到消息:{}----", str);
}
@RabbitListener(queues = "task_queue")
public void receive2(String str) {
log.info("----消费者2收到消息:{}----", str);
}
}
消息合理分发
autoAck=false,关闭自动确认回执模式.
SpringBot封装的api默认为手动确认回执模式,SpringBoot会自动发送回执.每次抓取消息数量
设置qos=1,每次只抓取一条消息
SpringBoot默认每次抓取250条消息
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
listener:
simple:
prefetch: 1 #设置每次抓取一条消息
消息持久化
队列持久化
new Queue(队列名)
,默认为持久化队列,new Queue(队列名,false)
为非持久化队列消息数据持久化
使用amqpTemplate.convertAndSend
发送的消息默认为持久化的消息amqpTemplate.convertAndSend(队列名,消息,消息预处理对象)
,消息与处理对象中可以获取消息属性,可以把持久化属性改为非持久,可以发送非持久消息
发布订阅模式(资源共享)
发布订阅模式需要使用fanout交换机
创建交换机
@Bean
public FanoutExchange fanoutExchange() {
//非持久化和不自动删除,默认为持久化和自动删除
return new FanoutExchange("logs",false,false);
}
创建消息提供者
amqpTemplate.convertAndSend("交换机名","队列名或路由键","消息");
,三个参数依次为:
- 交换机名
- 队列名或路由键
- 消息
@Component
public class Provider {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
while (true) {
System.out.print("请输入消息:");
String s = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("logs", "", s);
}
}
}
创建消息消费者
@Slf4j
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
//随机命名队列,参数为false,true,true--->非持久,独占,自动删除
value = @Queue,
//绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
exchange = @Exchange(name = "logs", declare = "false")
))
public void receive1(String str) {
log.info("----消费者1收到消息:{}----", str);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false")))
public void receive2(String str) {
log.info("----消费者2收到消息:{}----", str);
}
}
路由模式
路由模式需要使用direct交换机
创建direct交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_logs", false, false);
}
创建消息生产者
@Component
public class Provider {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
while (true) {
System.out.print("请输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("请输入路由键:");
String k = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("direct_logs", k, s);
}
}
}
创建消息消费者
@Slf4j
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
//随机命名队列,参数为false,true,true--->非持久,独占,自动删除
value = @Queue,
//绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
exchange = @Exchange(name = "direct_logs", declare = "false"),
//绑定路由键
key = {"error"}
))
public void receive1(String str) {
log.info("----消费者1收到消息:{}----", str);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "direct_logs", declare = "false"),
key = {"info", "error", "waring"}
))
public void receive2(String str) {
log.info("----消费者2收到消息:{}----", str);
}
}
主题模式
主题模式需要使用topic交换机
topic交换机的路由键格式为*.*.*
,支持正则表达式,*
代表任意字符,#
代表任意字符且任意个值
创建topic交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic_logs", false, false);
}
创建消息生产者
@Component
public class Provider {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
while (true) {
System.out.print("请输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("请输入路由键:");
String k = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("topic_logs", k, s);
}
}
}
创建消息消费者
@Slf4j
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
//随机命名队列,参数为false,true,true--->非持久,独占,自动删除
value = @Queue,
//绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
exchange = @Exchange(name = "topic_logs", declare = "false"),
//绑定路由键
key = {"*.orange.*"}
))
public void receive1(String str) {
log.info("----消费者1收到消息:{}----", str);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topic_logs", declare = "false"),
key = {"*.*.rabbit", "info.*", "lazy.#"}
))
public void receive2(String str) {
log.info("----消费者2收到消息:{}----", str);
}
}
自定义随机命名队列
使用@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topic_logs", declare = "false"),
中value=@Queue方式绑定的对列为spring随机创建的队列,如下所示就是使用spring创建的随机队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topic_logs", declare = "false"),
key = {"*.*.rabbit", "info.*", "lazy.#"}
))
自定义随机队列
/**
* 放入Spring容器的对象键值对
* key---randomQueue
* value--Queue实例
*
* @return 随机命名的队列
*/
@Bean
public Queue randomQueue() {
return new Queue(UUID.randomUUID().toString(), false, true, true);
}
使用自定义的随机队列
@RabbitListener(bindings = @QueueBinding(
//随机命名队列,参数为false,true,true--->非持久,独占,自动删除
value = @Queue(name = "#{randomQueue.name}", declare = "false"),
//绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
exchange = @Exchange(name = "topic_logs", declare = "false"),
//绑定路由键
key = {"*.orange.*"}
))
public void receive1(String str) {
log.info("----消费者1收到消息:{}----", str);
}
其中#{}
为 SPEL——Spring Exception Language
可以直接访问Spring容器中的对象
${}
为 OGNL——Object Graph Navigation Language
Struts2中提供的一种表达式语言
#{randomQueue.name}
可以获取创建的随机队列名