DirectProducer: direct 类型交换器的生产者
MultiChannelConsumer: 一个连接下允许有多个信道
一个连接,我们可以使用多线程的方式模拟多个信道进行通讯。这样可以做到多路复用。
生产者
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:direct类型交换器的生产者* 案例:ZJJ_RabbitMQ_2019/11/03_13:17:17_fk6sz*/public class DirectProducer {//声明交换器名字public final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] args)throws IOException, TimeoutException {//创建连接、连接到RabbitMQConnectionFactory connectionFactory = new ConnectionFactory();//设置下连接工厂的连接地址(使用默认端口5672)connectionFactory.setHost("zjj101");/*设置登陆用户名和密码*/connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/connectionFactory.setVirtualHost("/");// connectionFactory.setHost("localhost");//创建连接Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//在信道中设置交换器 ,交换器是枚举channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//申明队列(放在消费者中去做)//申明路由键\消息体String[] routeKeys = {"king", "mark", "james"};for (int i = 0; i < 6; i++) {String routeKey = routeKeys[i % 3];String msg = "Hello,RabbitMQ" + (i + 1);//发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());System.out.println("Sent:" + routeKey + ":" + msg);}channel.close();connection.close();}}
输出:
Sent:king:Hello,RabbitMQ1Sent:mark:Hello,RabbitMQ2Sent:james:Hello,RabbitMQ3Sent:king:Hello,RabbitMQ4Sent:mark:Hello,RabbitMQ5Sent:james:Hello,RabbitMQ6
消费者
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:一个连接多个信道* 使用了多线程的支持* 演示消费者使用一个连接两个信道* 案例:ZJJ_RabbitMQ_2019/11/03_13:19:03_2vcj5*/public class MultiChannelConsumer {private static class ConsumerWorker implements Runnable {final Connection connection;public ConsumerWorker(Connection connection) {this.connection = connection;}public void run() {try {/*创建一个信道,意味着每个线程单独一个信道*/Channel channel = connection.createChannel();//信道设置交换器类型(direct)channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();//String queueName = "queue-king"; // 同一个队列//消费者名字,打印输出用final String consumerName = Thread.currentThread().getName() + "-all";/*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/String[] routekeys = {"king", "mark", "james"};for (String routekey : routekeys) {channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME,routekey);}System.out.println("[" + consumerName + "] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicPropertiesproperties,byte[] body)throws IOException {String message =new String(body, "UTF-8");System.out.println(consumerName+ " Received " + envelope.getRoutingKey()+ ":'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {//连接工厂ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();//一个连接多个信道for (int i = 0; i < 2; i++) {/*将连接作为参数,传递给每个线程*/Thread worker = new Thread(new ConsumerWorker(connection));worker.start();}}}
输出:
[Thread-0-all] Waiting for messages:[Thread-1-all] Waiting for messages:Thread-1-all Received king:'Hello,RabbitMQ1'Thread-1-all Received mark:'Hello,RabbitMQ2'Thread-1-all Received james:'Hello,RabbitMQ3'Thread-0-all Received king:'Hello,RabbitMQ1'Thread-0-all Received mark:'Hello,RabbitMQ2'Thread-0-all Received james:'Hello,RabbitMQ3'Thread-0-all Received king:'Hello,RabbitMQ4'Thread-0-all Received mark:'Hello,RabbitMQ5'Thread-0-all Received james:'Hello,RabbitMQ6'Thread-1-all Received king:'Hello,RabbitMQ4'Thread-1-all Received mark:'Hello,RabbitMQ5'Thread-1-all Received james:'Hello,RabbitMQ6'
