DirectProducer: direct 类型交换器的生产者
NormalConsumer: 普通的消费者
MultiBindConsumer: 队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定
对比:单个绑定的消费者只能收到指定的消息,多重绑定的的消费者可以收到所有的消息
生产者
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:direct类型交换器的生产者*/public class DirectProducer {//声明交换器名字public final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] args)throws IOException, TimeoutException {//创建连接、连接到RabbitMQConnectionFactory connectionFactory = new ConnectionFactory();//设置下连接工厂的连接地址(使用默认端口5672)connectionFactory.setHost("zjj101");/*设置登陆用户名和密码*/connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/connectionFactory.setVirtualHost("/");// connectionFactory.setHost("localhost");//创建连接Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//在信道中设置交换器 ,交换器是枚举channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//申明队列(放在消费者中去做)//申明路由键\消息体String[] routeKeys = {"king", "mark", "james"};for (int i = 0; i < 6; i++) {String routeKey = routeKeys[i % 3];String msg = "Hello,RabbitMQ" + (i + 1);//发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());System.out.println("Sent:" + routeKey + ":" + msg);}channel.close();connection.close();}}
消费者1
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:队列和交换器的多重绑定* 多个路由键的绑定* 案例:ZJJ_RabbitMQ_2019/11/03_13:17:45_y9y2f*/public class MultiBindConsumer {public static void main(String[] argv) throws IOException, TimeoutException {//连接工厂ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();//创建一个信道final Channel channel = connection.createChannel();//信道设置交换器类型(direct)channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个随机队列,每一次生成的队列都是不同的.String queueName = channel.queueDeclare().getQueue();/*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定* 做法就是for循环执行多次方法* */String[] routekeys = {"king", "mark", "james"};for (String routekey : routekeys) {channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME,routekey);}System.out.println(" [*] 等待消息:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicPropertiesproperties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" 接收到routingKey为 "+ envelope.getRoutingKey() + ":'" + message+ "'");}};channel.basicConsume(queueName, true, consumerA);}}
输出:
[*] 等待消息:接收到routingKey为 king:'Hello,RabbitMQ1'接收到routingKey为 mark:'Hello,RabbitMQ2'接收到routingKey为 james:'Hello,RabbitMQ3'接收到routingKey为 king:'Hello,RabbitMQ4'接收到routingKey为 mark:'Hello,RabbitMQ5'接收到routingKey为 james:'Hello,RabbitMQ6'
消费者2
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:普通的消费者*/public class NormalConsumer {public static void main(String[] argv)throws IOException, TimeoutException {//创建连接、连接到RabbitMQConnectionFactory connectionFactory = new ConnectionFactory();//设置下连接工厂的连接地址(使用默认端口5672)connectionFactory.setHost("zjj101");/*设置登陆用户名和密码*/connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/connectionFactory.setVirtualHost("/");//创建连接Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//在信道中设置交换器channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//申明队列(放在消费者中去做)String queueName = "queue-king";channel.queueDeclare(queueName, false, false, false, null);//绑定:将队列(queuq-king)与交换器通过 路由键 绑定(king)String routeKey = "king";channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routeKey);System.out.println("等待接收消息 ......");//申明一个消费者final Consumer consumer = new DefaultConsumer(channel) {/**** @param s* @param envelope* @param basicProperties* @param bytes 是生产者发送过来的消息* @throws IOException*/@Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {String message = new String(bytes, "UTF-8");System.out.println("收到routingKey为:[" + envelope.getRoutingKey() + "]的消息,消息内容是: " + message);}};//消息者正是开始在指定队列上消费。(queue-king)/** 参数1: 队列名称* 参数2:自动确认参数,如果是true则是自动确认,意思是我每一次消费了,* 我需要给RabbitMQ确认一下我已经消费完了* 参数3:消费者*/channel.basicConsume(queueName, true, consumer);}}
输出:
等待接收消息 ......收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ1收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ4
