MultiConsumerOneQueue: 一个队列多个消费者,则会表现出消息在消费者之间的轮询发送
生产者
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:direct类型交换器的生产者* 案例:ZJJ_RabbitMQ_2019/11/03_13:17:17_fk6sz*/public class DirectProducer {//声明交换器名字public final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] args)throws IOException, TimeoutException {//创建连接、连接到RabbitMQConnectionFactory connectionFactory = new ConnectionFactory();//设置下连接工厂的连接地址(使用默认端口5672)connectionFactory.setHost("zjj101");/*设置登陆用户名和密码*/connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/connectionFactory.setVirtualHost("/");// connectionFactory.setHost("localhost");//创建连接Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//在信道中设置交换器 ,交换器是枚举channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//申明队列(放在消费者中去做)//申明路由键\消息体String[] routeKeys = {"king", "mark", "james"};for (int i = 0; i < 6; i++) {String routeKey = routeKeys[i % 3];String msg = "Hello,RabbitMQ" + (i + 1);//发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());System.out.println("Sent:" + routeKey + ":" + msg);}channel.close();connection.close();}}
输出:
Sent:king:Hello,RabbitMQ1Sent:mark:Hello,RabbitMQ2Sent:james:Hello,RabbitMQ3Sent:king:Hello,RabbitMQ4Sent:mark:Hello,RabbitMQ5Sent:james:Hello,RabbitMQ6
消费者
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:一个队列绑定多个消费者,则会表现出消息在消费者之间的轮询发送。* 案例:ZJJ_RabbitMQ_2019/11/03_13:19:31_krcss*/public class MultiConsumerOneQueue {private static class ConsumerWorker implements Runnable {final Connection connection;final String queueName;public ConsumerWorker(Connection connection, String queueName) {this.connection = connection;this.queueName = queueName;}public void run() {try {/*创建一个信道,意味着每个线程单独一个信道*/final Channel channel = connection.createChannel();//信道设置交换器类型(direct)channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);/*声明一个队列,rabbitmq,如果队列已存在,不会重复创建*/channel.queueDeclare(queueName, false, false, false, null);//消费者名字,打印输出用final String consumerName = Thread.currentThread().getName();/*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/String[] routekeys = {"king", "mark", "james"};for (String routekey : routekeys) {channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME,routekey);}System.out.println(" [" + consumerName + "] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicPropertiesproperties,byte[] body)throws IOException {String message =new String(body, "UTF-8");System.out.println(consumerName+ " Received " + envelope.getRoutingKey()+ ":'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {//连接工厂ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();//3个线程,线程之间共享队列,一个队列多个消费者String queueName = "focusAll";for (int i = 0; i < 3; i++) {/*将队列名作为参数,传递给每个线程*/Thread worker = new Thread(new ConsumerWorker(connection, queueName));worker.start();}}}
输出:
[Thread-2] Waiting for messages:[Thread-1] Waiting for messages:[Thread-0] Waiting for messages:Thread-2 Received king:'Hello,RabbitMQ1'Thread-2 Received king:'Hello,RabbitMQ4'Thread-0 Received mark:'Hello,RabbitMQ2'Thread-0 Received mark:'Hello,RabbitMQ5'Thread-1 Received james:'Hello,RabbitMQ3'Thread-1 Received james:'Hello,RabbitMQ6'
