初识MQ
同步通讯
异步通讯
MQ常见框架
MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
RabbitMQ快速入门
RabbitMQ概述和安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
docker pull rabbitmq:3-management
docker run \
-e RABBITMQ_DEFAULT_USER=xiaochong \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
RabbitMQ中的几个概念:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
常见消息模型
快速入门
HelloWorld案例
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
<dependencies>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("23.224.181.140");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xiaochong");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("23.224.181.140");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xiaochong");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
为什么消费者也创建队列,生产者也创建队列
因为,这两个服务部署是不确定的,不确定哪个先,已经存在就不会在创建
SpringAMQP
什么是SpringAMQP
SpringAmqp的官方地址:
Basic Queue简单队列模型
利用SpringAMQP实现HelloWorld中的基础消息队列功能
流程如下:
- 在父工程中引入spring-amqp的依赖
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
步骤1:引入AMQP依赖
步骤2:在publisher中编写测试方法,向simple.queue发送消息
spring:
rabbitmq:
host: 23.224.181.140
port: 5672
username: xiaochong
password: 123321
virtual-host: /
package cn.itcast.mq.spring;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName = "simple.queue";
String message = "hello,spring amqp!";
rabbitTemplate.convertAndSend(queueName,message);
}
}
步骤3:在consumer中编写消费逻辑,监听simple.queue
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
Work Queue工作队列
Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
发布( Publish )、订阅( Subscribe )
发布订阅-Fanout Exchange
利用SpringAMQP演示FanoutExchange的使用
步骤1:在consumer服务声明Exchange、Queue、Binding
package cn.itcast.mq;
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("xiaochong");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
步骤2:在consumer服务声明两个消费者
@RabbitListener(queues = "fanout.queue1")
public void listenWorkQueue(String msg) throws InterruptedException {
System.out.println("消费者1接收到信息: [" + msg + "]" + LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到信息: [" + msg + "]" + LocalTime.now());
}
步骤3:在publisher服务发送消息到FanoutExchange
@Test
public void testSendFanoutExchange(){
//交换机名称
String exchangeName = "xiaochong";
//消息
String message = "hello,every one!";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}