fanout广播交换器,只要消息来了,内部会给所以的队列都发一个消息.
生产者
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:fanout生产者* 生产了三条消息*/public class FanoutProducer {public final static String EXCHANGE_NAME = "fanout_logs";public static void main(String[] args) throws IOException, TimeoutException {/*** 创建连接连接到MabbitMQ*/ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("zjj101");factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 创建一个连接Connection connection = factory.newConnection();// 创建一个信道Channel channel = connection.createChannel();// fanout交换器channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);/*日志消息级别,作为路由键使用*/String[] routekeys = {"king", "mark", "james"};for (int i = 0; i < 3; i++) {String routekey = routekeys[i % 3];//每一次发送一条消息// 发送的消息String message = "Hello World_" + (i + 1);//参数1:exchange name//参数2:routing keychannel.basicPublish(EXCHANGE_NAME, routekey,null, message.getBytes());System.out.println(" [x] Sent '" + routekey + "':'"+ message + "'");}// 关闭频道和连接channel.close();connection.close();}}
消息:
[x] Sent 'king':'Hello World_1'[x] Sent 'mark':'Hello World_2'[x] Sent 'james':'Hello World_3'
消费者1
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:fanout消费者--绑定多个路由键*/public class Consumer1 {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("zjj101");factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME,BuiltinExchangeType.FANOUT);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();/*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/String[] routekeys = {"king", "mark", "james"};for (String routekey : routekeys) {channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME,routekey);}System.out.println(" [" + queueName + "] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println("Received " + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
输出:
[amq.gen-sWSOqxiXaQN1PZSrKaz59w] Waiting for messages:Received king':'Hello World_1'Received mark':'Hello World_2'Received james':'Hello World_3'
消费者2
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/***类说明:fanout消费者--绑定一个不存在的路由键*/public class Consumer2 {public static void main(String[] argv) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("zjj101");factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME,BuiltinExchangeType.FANOUT);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();//设置一个不存在的路由键String routekey="xxx";channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);System.out.println(" [*] Waiting for messages......");// 创建队列消费者final Consumer consumerB = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");//记录日志到文件:System.out.println( "Received ["+ envelope.getRoutingKey()+ "] "+message);}};channel.basicConsume(queueName, true, consumerB);}}
输出:
[*] Waiting for messages......Received [king] Hello World_1Received [mark] Hello World_2Received [james] Hello World_3
