使用 DirectProducer 作为生产者,NormalConsumer 作为消费者,消费者绑定一个队列
生产者
direct 类型交换器的生产者
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:direct类型交换器的生产者* 案例:ZJJ_RabbitMQ_2019/11/03_13:17:17_fk6sz*/public class DirectProducer {//声明交换器名字public final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] args)throws IOException, TimeoutException {//创建连接、连接到RabbitMQConnectionFactory connectionFactory = new ConnectionFactory();//设置下连接工厂的连接地址(使用默认端口5672)connectionFactory.setHost("zjj101");/*设置登陆用户名和密码*/connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/connectionFactory.setVirtualHost("/");// connectionFactory.setHost("localhost");//创建连接Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//在信道中设置交换器 ,交换器是枚举channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//申明队列(放在消费者中去做)//申明路由键\消息体String[] routeKeys = {"king", "mark", "james"};for (int i = 0; i < 6; i++) {String routeKey = routeKeys[i % 3];String msg = "Hello,RabbitMQ" + (i + 1);//发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());System.out.println("Sent:" + routeKey + ":" + msg);}channel.close();connection.close();}}
消费者
普通的消费者
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:普通的消费者*/public class NormalConsumer {public static void main(String[] argv)throws IOException, TimeoutException {//创建连接、连接到RabbitMQConnectionFactory connectionFactory = new ConnectionFactory();//设置下连接工厂的连接地址(使用默认端口5672)connectionFactory.setHost("zjj101");/*设置登陆用户名和密码*/connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/connectionFactory.setVirtualHost("/");//创建连接Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//在信道中设置交换器channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//申明队列(放在消费者中去做)String queueName = "queue-king";channel.queueDeclare(queueName, false, false, false, null);//绑定:将队列(queuq-king)与交换器通过 路由键 绑定(king)String routeKey = "king";channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routeKey);System.out.println("等待接收消息 ......");//申明一个消费者final Consumer consumer = new DefaultConsumer(channel) {/**** @param s* @param envelope* @param basicProperties* @param bytes 是生产者发送过来的消息* @throws IOException*/@Overridepublic void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {String message = new String(bytes, "UTF-8");System.out.println("收到routingKey为:[" + envelope.getRoutingKey() + "]的消息,消息内容是: " + message);}};//消息者正是开始在指定队列上消费。(queue-king)/** 参数1: 队列名称* 参数2:自动确认参数,如果是true则是自动确认,意思是我每一次消费了,* 我需要给RabbitMQ确认一下我已经消费完了* 参数3:消费者*/channel.basicConsume(queueName, true, consumer);}}
输出结果:
等待接收消息 ......收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ1收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ4
