fanout广播交换器,只要消息来了,内部会给所以的队列都发一个消息.
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 类说明:fanout生产者
* 生产了三条消息
*/
public class FanoutProducer {
public final static String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("zjj101");
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// fanout交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
/*日志消息级别,作为路由键使用*/
String[] routekeys = {"king", "mark", "james"};
for (int i = 0; i < 3; i++) {
String routekey = routekeys[i % 3];//每一次发送一条消息
// 发送的消息
String message = "Hello World_" + (i + 1);
//参数1:exchange name
//参数2:routing key
channel.basicPublish(EXCHANGE_NAME, routekey,
null, message.getBytes());
System.out.println(" [x] Sent '" + routekey + "':'"
+ message + "'");
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
消息:
[x] Sent 'king':'Hello World_1'
[x] Sent 'mark':'Hello World_2'
[x] Sent 'james':'Hello World_3'
消费者1
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 类说明:fanout消费者--绑定多个路由键
*/
public class Consumer1 {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("zjj101");
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME,
BuiltinExchangeType.FANOUT);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
/*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/
String[] routekeys = {"king", "mark", "james"};
for (String routekey : routekeys) {
channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME,
routekey);
}
System.out.println(" [" + queueName + "] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received " + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
输出:
[amq.gen-sWSOqxiXaQN1PZSrKaz59w] Waiting for messages:
Received king':'Hello World_1'
Received mark':'Hello World_2'
Received james':'Hello World_3'
消费者2
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:fanout消费者--绑定一个不存在的路由键
*/
public class Consumer2 {
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("zjj101");
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME,
BuiltinExchangeType.FANOUT);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//设置一个不存在的路由键
String routekey="xxx";
channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);
System.out.println(" [*] Waiting for messages......");
// 创建队列消费者
final Consumer consumerB = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
//记录日志到文件:
System.out.println( "Received ["+ envelope.getRoutingKey()
+ "] "+message);
}
};
channel.basicConsume(queueName, true, consumerB);
}
}
输出:
[*] Waiting for messages......
Received [king] Hello World_1
Received [mark] Hello World_2
Received [james] Hello World_3