- RabbitMQ
- ActiveMQ
- RocketMQ(阿里开源)
- Kafka
- TubeMQ(腾讯开源)
RabboitMQ
搭建RabbitMQ服务器
- 克隆docker-bash:rabbitmq
- 设置ip
./ip-staticip:192.168.64.140
- 下载rabbitmq镜像
docker pull rabbitmq:management#或者从 code 下载 rabbit-image.gz#上传到服务器,然后执行镜像导入docker load -i rabbit-image.gz
- 准备配置文件,配置管理员用户名和密码
- 关闭防火墙
systemctl stop firewalldsystemctl disable firewalld
- 重启docker服务
systemctl restart docker
- 添加配置文件
mkdir /etc/rabbitmqvim /etc/rabbitmq/rabbitmq.conf
- 新增管理员用户名密码配置
default_user = admindefault_pass = admin
- 启动rabbitmq服务器
docker run -d --name rabbit \-p 5672:5672 \-p 15672:15672 \-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \rabbitmq:management
- 访问控制台
[http://192.168.64.140:15672](http://192.168.64.140:15672),用户名密码为admin
简单消费者和生产者简单模式
简单模式中只有一个生产者和一个消费者,生产者发送消息,消费者接收消息
创建连接
package m1;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @Author: 一拳超人* @Date: 2021/10/21 17:16*/public class Connection {public static com.rabbitmq.client.Connection connectionInfo() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.64.140");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");return connectionFactory.newConnection();}}
创建生产者
package m1;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** RabbitMQ生产者** @Author: 一拳超人* @Date: 2021/10/21 16:46*/public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();//通信通道Channel channel = connection.createChannel();/*2.在服务器中创建队列hello world参数说明:1.队列名2.是否为持久队列3.是否为排他(独占)队列(不会被多个消费者独占)4.是否自动删除5.队列得其他参数属性*/channel.queueDeclare("HelloWorld", false, false, false, null);/*3.向hello word队列发送消息参数说明:1.交换机,空串表示默认交换机2.队列名3.消息属性4.消息正文*/channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());channel.close();}}
创建生产者
package m1;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** RabbitMQ生产者** @Author: 一拳超人* @Date: 2021/10/21 16:46*/public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();//通信通道Channel channel = connection.createChannel();/*2.在服务器中创建队列hello world参数说明:1.队列名2.是否为持久队列3.是否为排他(独占)队列(不会被多个消费者独占)4.是否自动删除5.队列得其他参数属性*/channel.queueDeclare("HelloWorld", false, false, false, null);/*3.向hello word队列发送消息参数说明:1.交换机,空串表示默认交换机2.队列名3.消息属性4.消息正文*/channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());channel.close();}}
工作模式
工作模式由一个生产者多个消费者构成,一个生产者发送消息,多个消费者之间轮询接受消息,且每个消费者获取的消息唯一,多个消费者之间共用一个队列.
合理收发消息
合理收发消息,需要让服务器知道消费者是否处理完消息,
ACK
使用ACK(Ackonw led)发送消息回执,用以向服务器发送通知,告知服务器一条消息已经处理完毕,服务器可以通过ack知道消息是否处理完毕,消费者使用basicConsume方法接收消息时,第二个参数为是否手动ack,true为自动确认,false为手动确认.
手动确认需要在回调对象中设置消息回执,使用basicAck(DeliveryTag,multiple)方法.
方法参数:
- DeliveryTag:long类型,回执
- mutiple:布尔类型,表示是否确认全部消息
QOS
qos可以设置每次抓取消息的数量,在抓取的消息处理完成前不会在抓取消息,在手动ack模式下才有效.
设置方式channel.basicQos(1);参数:prefetchCount – 服务器将传递的最大消息数,如果没有限制则为 0.
package m2;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import m1.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @Author: 一拳超人* @Date: 2021/10/22 9:37*/public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接和通信通道Channel channel = Connection.connectionInfo().createChannel();//创建队列channel.queueDeclare("HelloWorld", false, false, false, null);//回调对象DeliverCallback deliverCallback = (consumer, message) -> {String s = new String(message.getBody());System.out.println("收到" + s);//处理消息,遍历字符串,遇到"."则暂停一秒,模拟耗时消息for (int i = 0; i < s.length(); i++) {if ('.' == s.charAt(i)) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}System.out.println("消息处理完成");/*处理完消息手动发送回执参数说明:1.DeliveryTag:long类型,回执标签2.multiple:布尔类型,是否确认之前的收到的所有消息*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = consumerTag -> {};channel.basicQos(1);/*消费消息参数:1.队列名2.autoAck,true为自动确认,false为手动确认*/channel.basicConsume("HelloWorld", false, deliverCallback, cancelCallback);}}
消息持久化
消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在。消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义。
- 队列持久化
- 消息数据持久化
设置持久化队列
channel.queueDeclare("HelloWorld", true, false, false, null);
参数:
- 队列名
- 是否为持久化队列
- 是否为排他队列
- 是否自动删除(服务器将在队列不使用时删除)
将一个队列设置为持久队列只需要将第二个参数设置为true
消息持久化
channel.basicPublish("", "HelloWorld", MessageProperties.PERSISTENT_BASIC, s.getBytes(StandardCharsets.UTF_8));
其中MessageProperties.PERSISTENT_BASIC为持久化参数
发布订阅模式
将一条消息发送给所有消费者,同一条消息所有的消费者都可以收到.
交换机
- Direct 直连
- Fanout 扇出,扇形
- Topic 主题
Bus配置刷新
修改2,3,4,9添加依赖
- Bus
- Rabbitmq
binder-rabbit
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-bus</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>
09添加依赖actuator
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
修改09的yml配置
- 暴漏bus-refresh:
- rabbitmq连接配置
server:port: 6001spring:application:name: config-servercloud:config:server:git:uri: https://gitee.com/G318705208/microservice-learningsearch-paths: /SpringCloud01/configusername: gqc318705208@163.compassword: Gqc19981120.rabbitmq:host: 192.168.64.140port: 5672username: adminpassword: admineureka:client:service-url:defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eurekamanagement:endpoints:web:exposure:include: bus-refresh
修改2,3,4的yml配置,修改config目录的三个文件并提交
- rabbitmq连接配置
- Bus发送刷新指令,其他模块接受指令并执行刷新操作,RabbitMQ模式为主题模式.
- 链路日志发送至rabbitmq,zipkin接受链路日志,RabbitMQ模式为简单模式
订单削峰
将订单信息发送至RabbitMQ
- 新建配置类用于新建队列
Queue为org.springframework.amqp.core.Queue包下的类,用于封装队列参数,RabbitMQ的自动配置类会自动发现Queue实例,并根据其中的参数连接服务器创建队列
import org.springframework.amqp.core.Queue;@Configurationpublic class RabbitMQConfig {/*** 新建Queue队列,用于封装队列参数* RabbitMQ的自动配置类会自动发现Queue实例,* 根据其中的参数连接服务器创建队列*/@Beanpublic Queue orderQueue() {return new Queue("orderQueue", true, false, false);}}
- 注入AmqpTemplate实例使用此实例将订单信息发送至RabbitMQ
使用amqpTamplate实例的convertAndSend方法将订单信息发送至消息队列,此方法可以转自动将数据转换为byte[]数组并发送.第一个参数为队列名称,第二个参数为要发送的消息信息
@Servicepublic class OrderServiceImpl implements OrderService {/*** 在RabbitAutoConfiguration中自动创建了AmqpTemplate实例*/@Autowiredprivate AmqpTemplate amqpTemplate;public String saveOrder(PdOrder pdOrder) throws Exception {//转换(自动将数据转换为byte[]数组)并发送amqpTemplate.convertAndSend("orderQueue", pdOrder);}
从RabbitMQ中取出消息并进行处理
新建消息处理类
@RabbitListener注解可以自动注册成为消费者,自动开始从队列接收消息,自动将消息发送至@RabbitHandler注解标注方法中@RabbitHandler用于指定处理消息的方法,配和@RabbitListener注解使用,且一个类中只能有一个orderService实例为订单处理接口,用于处理订单信息@Component@RabbitListener(queues = "orderQueue")public class OrderConsumer {@Autowiredprivate OrderService orderService;//@RabbitHandler配和@RabbitListener注解使用,用于指定处理消息的方法,且一个类中只能有一个@RabbitHandlerpublic void receive(PdOrder pdOrder) throws Exception {orderService.saveOrder(pdOrder);System.out.println("------------订单已保存----------");}}
处理订单信息 ```java @Service public class OrderServiceImpl implements OrderService { @Autowired PdOrderMapper pdOrderMapper;
@Autowired PdCartItemMapper pdCartItemMapper;
@Autowired PdItemMapper pdItemMapper;
@Autowired PdItemParamItemMapper pdItemParamItemMapper;
@Autowired PdShippingMapper pdShippingMapper;
@Autowired PdOrderItemMapper pdOrderItemMapper;
public String saveOrder(PdOrder pdOrder) throws Exception {
String orderId = pdOrder.getOrderId();PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());pdOrder.setShippingName(pdShipping.getReceiverName());pdOrder.setShippingCode(pdShipping.getReceiverAddress());pdOrder.setStatus(1);//pdOrder.setPaymentType(1);pdOrder.setPostFee(10D);pdOrder.setCreateTime(new Date());double payment = 0;List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());for (ItemVO itemVO : itemVOs) {PdOrderItem pdOrderItem = new PdOrderItem();String id = generateId();//String id="2";pdOrderItem.setId(id);pdOrderItem.setOrderId(orderId);pdOrderItem.setItemId("" + itemVO.getPdItem().getId());pdOrderItem.setTitle(itemVO.getPdItem().getTitle());pdOrderItem.setPrice(itemVO.getPdItem().getPrice());pdOrderItem.setNum(itemVO.getPdCartItem().getNum());payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();pdOrderItemMapper.insert(pdOrderItem);}pdOrder.setPayment(payment);pdOrderMapper.insert(pdOrder);return orderId;
} }
```
