Hello world 最简单的模式
我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者,实现最简单的工作模式
在下图中,“ P” 是我们的生产者,“ C” 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区
连接的时候,需要开启 5672 端口
- 依赖
```xml
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6
- **消息生产者**
```java
public class Publisher {
public static final String Queue = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.168");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
//创建连接 并 获取信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化, 当mq重启后,是否还在
* 3.是否独占 只能有一个消费者监听该队列;当Connection关闭时,是否删除队列
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除
* 5.其他参数
*/
channel.queueDeclare(Queue, true, false, false, null);
String message = "hello world";
/**
* 发送一个消息
* 1.发送到哪个交换机,简单模式下会使用默认交换机""
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", Queue, null, message.getBytes());
// 释放资源
channel.close;
connection.close();
System.out.println("消息发送完毕");
}
}
消息消费者 ```java public class Consumer { public static final String Queue = “simple_queue”;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.168"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 //创建连接 并 获取信道 Channel channel = factory.newConnection().createChannel(); // 方式一 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断"); /** * 消费者消费消息 - 接受消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者成功消费的回调 * 4.消息被取消时的回调 */ channel.basicConsume(Queue, true, deliverCallback, cancelCallback); // 方式二 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag" + consumerTag); System.out.println(envelope.getExchange()); System.out.println(envelope.getRoutingKey()); System.out.println("body" + new String(body)); } }; channel.basicConsume(Queue, true, consumer);
}
}
<a name="OdW0I"></a>
## Work Queues 工作队列模式
---

- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
<a name="nO05Q"></a>
### 轮询分发消息
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
**1、抽取工具类**
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.71");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2、启动两个工作线程来接受消息
public class Worker01 {
public static final String Queue = "simple_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
/**
* 消费者消费消息 - 接受消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者成功消费的回调
* 4.消息被取消时的回调
*/
System.out.println("C1 消费者启动等待消费.................. ");
channel.basicConsume(Queue, true, deliverCallback, cancelCallback);
}
}
复制一个实例
3、启动一个发送消息线程
public class Task01 {
public static final String Queue = "simple_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", Queue, null, message.getBytes());
System.out.println("消息发送完成:" + message);
}
}
}
- 结果展示
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
Pub/Sub 订阅模式
1、模式说明
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来
- Queue:消息队列,接受消息、缓存消息
- Exchange:交换机(X),一方面接收生产者发送的消息。另一方面,决定如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有如下3中常见类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发信息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
2、代码编写
1、定义交换机和队列
public class Publisher {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
// 5.创建交换机
/**
* 参数:
* 1. exchange:交换机名称
* 2. type:交换机类型
* DIRECT("direct"),:定向
* FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
* TOPIC("topic"),通配符的方式
* HEADERS("headers");参数匹配
*
* 3. durable:是否持久化
* 4. autoDelete:自动删除
* 5. internal:内部使用。 一般false
* 6. arguments:参数
*
* */
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
// 6.创建队列
String queue1 = "test_fanout_queue1";
String queue2 = "test_fanout_queue2";
channel.queueDeclare(queue1, true, false, false, null);
channel.queueDeclare(queue2, true, false, false, null);
// 7.绑定队列和交换机
/**
* 参数:
* 1. queue:队列名称
* 2. exchange:交换机名称
* 3. routingKey:路由键,绑定规则
* 如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1, exchangeName, "");
channel.queueBind(queue2, exchangeName, "");
// 8.发送消息
String message = "INFO: 线程MAIN 调用了 findAll方法2。。。";
channel.basicPublish(exchangeName, "", null, message.getBytes());
// 9.释放资源
}
}
2、定义两个消费者
public class Consumer1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
String queue1 = "test_fanout_queue1";
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer1 收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
channel.basicConsume(queue1, true, deliverCallback, cancelCallback);
}
}
public class Consumer2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
String queue2 = "test_fanout_queue2";
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer2 收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
channel.basicConsume(queue2, true, deliverCallback, cancelCallback);
}
}
3、总结
- 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
- 发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认的交换机)
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会讲队列绑定到默认的交换机。
Routing 路由模式
1、模式说明
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个Routing Key
- 消息的发送方在向Exchange发送消息时,也必须指定消息的Routing Key
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的Routing Key完全一致,才会接收到消息
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing Key
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key为error的消息
- C2:消费者,其所在队列指定了需要routing key为info、error、warning的消息
2、代码编写
public class Publisher {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
// 5.创建交换机
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 6.创建队列
String queue1 = "test_direct_queue1";
String queue2 = "test_direct_queue2";
channel.queueDeclare(queue1, true, false, false, null);
channel.queueDeclare(queue2, true, false, false, null);
// 7.绑定队列和交换机
/**
* 参数:
* 1. queue:队列名称
* 2. exchange:交换机名称
* 3. routingKey:路由键,绑定规则
* 如果交换机的类型为direct ,routingKey相同的消息才会转发到相应的队列
*/
//队列1绑定 error
channel.queueBind(queue1, exchangeName, "error");
//队列2绑定 info error warning
channel.queueBind(queue2, exchangeName, "info");
channel.queueBind(queue2, exchangeName, "error");
channel.queueBind(queue2, exchangeName, "warning");
// 8.发送消息
String message = "error: 线程MAIN 调用。。。";
channel.basicPublish(exchangeName, "error", null, message.getBytes());
// 9.释放资源
}
}
public class Consumer1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
String queue1 = "test_direct_queue1";
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer1 收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
channel.basicConsume(queue1, true, deliverCallback, cancelCallback);
}
}
public class Consumer2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
String queue1 = "test_direct_queue2";
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer2 收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
channel.basicConsume(queue1, true, deliverCallback, cancelCallback);
}
}
结果
- 当发送error消息时,C1,C2都收到了
- 当发送info消息时,只有C2收到
3、总结
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合routing key的队列。
Topics 通配符模式
1、模式说明
- Topic类型与Direct相比,都是可以根据Routing Key把消息路由到不同的队列。只不过Topic类型 Exchange可以让队列在绑定Routing key的时候使用通配符!
- RoutingKey一般都是具有一个或多个单词组成,多个单词之间以 “.” 分割,例如:item.insert
- 通配符规则:
- # 匹配 一个或多个单词,例如:item.# 能够匹配到 item.insert.abc,item.insert
- * 匹配一个单词,例如:item.* 只能匹配到 item.insert
图解:
- 红色Queue:绑定的是usa.#,因此凡是以 usa. 开头的routing key都会被匹配到
- 黄色Queue:绑定的是 #.news,因此凡是以 .news 结尾的routing key都会被匹配
2、代码编写
public class Publisher {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
// 5.创建交换机
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
// 6.创建队列
String queue1 = "test_topic_queue1";
String queue2 = "test_topic_queue2";
channel.queueDeclare(queue1, true, false, false, null);
channel.queueDeclare(queue2, true, false, false, null);
// 7.绑定队列和交换机
/**
* 参数:
* 1. queue:队列名称
* 2. exchange:交换机名称
* 3. routingKey:路由键,绑定规则
* 如果交换机的类型为topic ,routingKey相同的消息才会转发到相应的队列
*/
//队列1绑定 error
channel.queueBind(queue1, exchangeName, "#.error");
channel.queueBind(queue1, exchangeName, "order.*");
//队列2绑定 info error warning
channel.queueBind(queue2, exchangeName, "*.*");
// 8.发送消息
String message = "error: 线程MAIN 调用。。。";
channel.basicPublish(exchangeName, "goods.info", null, message.getBytes());
// 9.释放资源
}
}
public class Consumer1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
String queue1 = "test_topic_queue1";
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer1 收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
channel.basicConsume(queue1, true, deliverCallback, cancelCallback);
}
}
public class Consumer2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
String queue1 = "test_topic_queue2";
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Consumer2 收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费被中断");
channel.basicConsume(queue1, true, deliverCallback, cancelCallback);
}
}
结果
发送good.info,只有C2收到了
3、小结
Topic主题可以实现Pub/Sub发布与订阅和Routing路由模式的功能,只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。