• RabbitMQ
  • ActiveMQ
  • RocketMQ(阿里开源)
  • Kafka
  • TubeMQ(腾讯开源)

RabboitMQ

搭建RabbitMQ服务器

  1. 克隆docker-bash:rabbitmq
  2. 设置ip
  1. ./ip-static
  2. ip:192.168.64.140
  1. 下载rabbitmq镜像
  1. docker pull rabbitmq:management
  2. #或者从 code 下载 rabbit-image.gz
  3. #上传到服务器,然后执行镜像导入
  4. docker load -i rabbit-image.gz
  1. 准备配置文件,配置管理员用户名和密码
    • 关闭防火墙
  1. systemctl stop firewalld
  2. systemctl disable firewalld
  • 重启docker服务
  1. systemctl restart docker
  • 添加配置文件
  1. mkdir /etc/rabbitmq
  2. vim /etc/rabbitmq/rabbitmq.conf
  • 新增管理员用户名密码配置
  1. default_user = admin
  2. default_pass = admin
  1. 启动rabbitmq服务器
  1. docker run -d --name rabbit \
  2. -p 5672:5672 \
  3. -p 15672:15672 \
  4. -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
  5. -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
  6. rabbitmq:management
  1. 访问控制台[http://192.168.64.140:15672](http://192.168.64.140:15672),用户名密码为admin

简单消费者和生产者简单模式

简单模式中只有一个生产者和一个消费者,生产者发送消息,消费者接收消息

创建连接

  1. package m1;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @Author: 一拳超人
  7. * @Date: 2021/10/21 17:16
  8. */
  9. public class Connection {
  10. public static com.rabbitmq.client.Connection connectionInfo() throws IOException, TimeoutException {
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. connectionFactory.setHost("192.168.64.140");
  13. connectionFactory.setPort(5672);
  14. connectionFactory.setUsername("admin");
  15. connectionFactory.setPassword("admin");
  16. return connectionFactory.newConnection();
  17. }
  18. }

创建生产者

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * RabbitMQ生产者
  7. *
  8. * @Author: 一拳超人
  9. * @Date: 2021/10/21 16:46
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. //创建连接
  14. com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();
  15. //通信通道
  16. Channel channel = connection.createChannel();
  17. /*
  18. 2.在服务器中创建队列hello world
  19. 参数说明:
  20. 1.队列名
  21. 2.是否为持久队列
  22. 3.是否为排他(独占)队列(不会被多个消费者独占)
  23. 4.是否自动删除
  24. 5.队列得其他参数属性
  25. */
  26. channel.queueDeclare("HelloWorld", false, false, false, null);
  27. /*
  28. 3.向hello word队列发送消息
  29. 参数说明:
  30. 1.交换机,空串表示默认交换机
  31. 2.队列名
  32. 3.消息属性
  33. 4.消息正文
  34. */
  35. channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());
  36. channel.close();
  37. }
  38. }

创建生产者

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * RabbitMQ生产者
  7. *
  8. * @Author: 一拳超人
  9. * @Date: 2021/10/21 16:46
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. //创建连接
  14. com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();
  15. //通信通道
  16. Channel channel = connection.createChannel();
  17. /*
  18. 2.在服务器中创建队列hello world
  19. 参数说明:
  20. 1.队列名
  21. 2.是否为持久队列
  22. 3.是否为排他(独占)队列(不会被多个消费者独占)
  23. 4.是否自动删除
  24. 5.队列得其他参数属性
  25. */
  26. channel.queueDeclare("HelloWorld", false, false, false, null);
  27. /*
  28. 3.向hello word队列发送消息
  29. 参数说明:
  30. 1.交换机,空串表示默认交换机
  31. 2.队列名
  32. 3.消息属性
  33. 4.消息正文
  34. */
  35. channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());
  36. channel.close();
  37. }
  38. }

工作模式

工作模式由一个生产者多个消费者构成,一个生产者发送消息,多个消费者之间轮询接受消息,且每个消费者获取的消息唯一,多个消费者之间共用一个队列.

合理收发消息

合理收发消息,需要让服务器知道消费者是否处理完消息,

ACK

使用ACK(Ackonw led)发送消息回执,用以向服务器发送通知,告知服务器一条消息已经处理完毕,服务器可以通过ack知道消息是否处理完毕,消费者使用basicConsume方法接收消息时,第二个参数为是否手动ack,true为自动确认,false为手动确认.

手动确认需要在回调对象中设置消息回执,使用basicAck(DeliveryTag,multiple)方法.

方法参数:

  • DeliveryTag:long类型,回执
  • mutiple:布尔类型,表示是否确认全部消息

QOS

qos可以设置每次抓取消息的数量,在抓取的消息处理完成前不会在抓取消息,在手动ack模式下才有效.

设置方式channel.basicQos(1);参数:prefetchCount – 服务器将传递的最大消息数,如果没有限制则为 0.

  1. package m2;
  2. import com.rabbitmq.client.CancelCallback;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import m1.Connection;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * @Author: 一拳超人
  10. * @Date: 2021/10/22 9:37
  11. */
  12. public class Consumer {
  13. public static void main(String[] args) throws IOException, TimeoutException {
  14. //创建连接和通信通道
  15. Channel channel = Connection.connectionInfo().createChannel();
  16. //创建队列
  17. channel.queueDeclare("HelloWorld", false, false, false, null);
  18. //回调对象
  19. DeliverCallback deliverCallback = (consumer, message) -> {
  20. String s = new String(message.getBody());
  21. System.out.println("收到" + s);
  22. //处理消息,遍历字符串,遇到"."则暂停一秒,模拟耗时消息
  23. for (int i = 0; i < s.length(); i++) {
  24. if ('.' == s.charAt(i)) {
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. System.out.println("消息处理完成");
  33. /*
  34. 处理完消息手动发送回执
  35. 参数说明:
  36. 1.DeliveryTag:long类型,回执标签
  37. 2.multiple:布尔类型,是否确认之前的收到的所有消息
  38. */
  39. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  40. };
  41. CancelCallback cancelCallback = consumerTag -> {
  42. };
  43. channel.basicQos(1);
  44. /*
  45. 消费消息
  46. 参数:
  47. 1.队列名
  48. 2.autoAck,true为自动确认,false为手动确认
  49. */
  50. channel.basicConsume("HelloWorld", false, deliverCallback, cancelCallback);
  51. }
  52. }

消息持久化

消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在。消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义。

  1. 队列持久化
  2. 消息数据持久化

设置持久化队列

channel.queueDeclare("HelloWorld", true, false, false, null);

参数:

  1. 队列名
  2. 是否为持久化队列
  3. 是否为排他队列
  4. 是否自动删除(服务器将在队列不使用时删除)

将一个队列设置为持久队列只需要将第二个参数设置为true

消息持久化

channel.basicPublish("", "HelloWorld", MessageProperties.PERSISTENT_BASIC, s.getBytes(StandardCharsets.UTF_8));

其中MessageProperties.PERSISTENT_BASIC为持久化参数

发布订阅模式

将一条消息发送给所有消费者,同一条消息所有的消费者都可以收到.

交换机

  1. Direct 直连
  2. Fanout 扇出,扇形
  3. Topic 主题

Bus配置刷新

  1. 修改2,3,4,9添加依赖

    1. Bus
    2. Rabbitmq
    3. binder-rabbit

      1. <dependency>
      2. <groupId>org.springframework.cloud</groupId>
      3. <artifactId>spring-cloud-bus</artifactId>
      4. </dependency>
      5. <dependency>
      6. <groupId>org.springframework.boot</groupId>
      7. <artifactId>spring-boot-starter-amqp</artifactId>
      8. </dependency>
      9. <dependency>
      10. <groupId>org.springframework.cloud</groupId>
      11. <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      12. </dependency>
  2. 09添加依赖actuator

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-actuator</artifactId>
    4. </dependency>
  3. 修改09的yml配置

    1. 暴漏bus-refresh:
    2. rabbitmq连接配置
      1. server:
      2. port: 6001
      3. spring:
      4. application:
      5. name: config-server
      6. cloud:
      7. config:
      8. server:
      9. git:
      10. uri: https://gitee.com/G318705208/microservice-learning
      11. search-paths: /SpringCloud01/config
      12. username: gqc318705208@163.com
      13. password: Gqc19981120.
      14. rabbitmq:
      15. host: 192.168.64.140
      16. port: 5672
      17. username: admin
      18. password: admin
      19. eureka:
      20. client:
      21. service-url:
      22. defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
      23. management:
      24. endpoints:
      25. web:
      26. exposure:
      27. include: bus-refresh
  4. 修改2,3,4的yml配置,修改config目录的三个文件并提交

    1. rabbitmq连接配置
  • Bus发送刷新指令,其他模块接受指令并执行刷新操作,RabbitMQ模式为主题模式.
  • 链路日志发送至rabbitmq,zipkin接受链路日志,RabbitMQ模式为简单模式

    订单削峰

    将订单信息发送至RabbitMQ

  1. 新建配置类用于新建队列

Queue为org.springframework.amqp.core.Queue包下的类,用于封装队列参数,RabbitMQ的自动配置类会自动发现Queue实例,并根据其中的参数连接服务器创建队列

  1. import org.springframework.amqp.core.Queue;
  2. @Configuration
  3. public class RabbitMQConfig {
  4. /**
  5. * 新建Queue队列,用于封装队列参数
  6. * RabbitMQ的自动配置类会自动发现Queue实例,
  7. * 根据其中的参数连接服务器创建队列
  8. */
  9. @Bean
  10. public Queue orderQueue() {
  11. return new Queue("orderQueue", true, false, false);
  12. }
  13. }
  1. 注入AmqpTemplate实例使用此实例将订单信息发送至RabbitMQ

使用amqpTamplate实例的convertAndSend方法将订单信息发送至消息队列,此方法可以转自动将数据转换为byte[]数组并发送.第一个参数为队列名称,第二个参数为要发送的消息信息

  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3. /**
  4. * 在RabbitAutoConfiguration中自动创建了AmqpTemplate实例
  5. */
  6. @Autowired
  7. private AmqpTemplate amqpTemplate;
  8. public String saveOrder(PdOrder pdOrder) throws Exception {
  9. //转换(自动将数据转换为byte[]数组)并发送
  10. amqpTemplate.convertAndSend("orderQueue", pdOrder);
  11. }

从RabbitMQ中取出消息并进行处理

  1. 新建消息处理类

    • @RabbitListener注解可以自动注册成为消费者,自动开始从队列接收消息,自动将消息发送至@RabbitHandler注解标注方法中
    • @RabbitHandler用于指定处理消息的方法,配和@RabbitListener注解使用,且一个类中只能有一个
    • orderService实例为订单处理接口,用于处理订单信息

      1. @Component
      2. @RabbitListener(queues = "orderQueue")
      3. public class OrderConsumer {
      4. @Autowired
      5. private OrderService orderService;
      6. //@RabbitHandler配和@RabbitListener注解使用,用于指定处理消息的方法,且一个类中只能有一个
      7. @RabbitHandler
      8. public void receive(PdOrder pdOrder) throws Exception {
      9. orderService.saveOrder(pdOrder);
      10. System.out.println("------------订单已保存----------");
      11. }
      12. }
  2. 处理订单信息 ```java @Service public class OrderServiceImpl implements OrderService { @Autowired PdOrderMapper pdOrderMapper;

    @Autowired PdCartItemMapper pdCartItemMapper;

    @Autowired PdItemMapper pdItemMapper;

    @Autowired PdItemParamItemMapper pdItemParamItemMapper;

    @Autowired PdShippingMapper pdShippingMapper;

    @Autowired PdOrderItemMapper pdOrderItemMapper;

    public String saveOrder(PdOrder pdOrder) throws Exception {

    1. String orderId = pdOrder.getOrderId();
    2. PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    3. pdOrder.setShippingName(pdShipping.getReceiverName());
    4. pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    5. pdOrder.setStatus(1);//
    6. pdOrder.setPaymentType(1);
    7. pdOrder.setPostFee(10D);
    8. pdOrder.setCreateTime(new Date());
    9. double payment = 0;
    10. List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    11. for (ItemVO itemVO : itemVOs) {
    12. PdOrderItem pdOrderItem = new PdOrderItem();
    13. String id = generateId();
    14. //String id="2";
    15. pdOrderItem.setId(id);
    16. pdOrderItem.setOrderId(orderId);
    17. pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    18. pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    19. pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    20. pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    21. payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    22. pdOrderItemMapper.insert(pdOrderItem);
    23. }
    24. pdOrder.setPayment(payment);
    25. pdOrderMapper.insert(pdOrder);
    26. return orderId;

    } }

```