fanout广播交换器,只要消息来了,内部会给所以的队列都发一个消息.

生产者

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 类说明:fanout生产者
  9. * 生产了三条消息
  10. */
  11. public class FanoutProducer {
  12. public final static String EXCHANGE_NAME = "fanout_logs";
  13. public static void main(String[] args) throws IOException, TimeoutException {
  14. /**
  15. * 创建连接连接到MabbitMQ
  16. */
  17. ConnectionFactory factory = new ConnectionFactory();
  18. // 设置MabbitMQ所在主机ip或者主机名
  19. factory.setHost("zjj101");
  20. factory.setUsername("guest");
  21. factory.setPassword("guest");
  22. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  23. factory.setVirtualHost("/");
  24. // 创建一个连接
  25. Connection connection = factory.newConnection();
  26. // 创建一个信道
  27. Channel channel = connection.createChannel();
  28. // fanout交换器
  29. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  30. /*日志消息级别,作为路由键使用*/
  31. String[] routekeys = {"king", "mark", "james"};
  32. for (int i = 0; i < 3; i++) {
  33. String routekey = routekeys[i % 3];//每一次发送一条消息
  34. // 发送的消息
  35. String message = "Hello World_" + (i + 1);
  36. //参数1:exchange name
  37. //参数2:routing key
  38. channel.basicPublish(EXCHANGE_NAME, routekey,
  39. null, message.getBytes());
  40. System.out.println(" [x] Sent '" + routekey + "':'"
  41. + message + "'");
  42. }
  43. // 关闭频道和连接
  44. channel.close();
  45. connection.close();
  46. }
  47. }

消息:

  1. [x] Sent 'king':'Hello World_1'
  2. [x] Sent 'mark':'Hello World_2'
  3. [x] Sent 'james':'Hello World_3'

消费者1

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 类说明:fanout消费者--绑定多个路由键
  6. */
  7. public class Consumer1 {
  8. public static void main(String[] argv) throws IOException,
  9. InterruptedException, TimeoutException {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("zjj101");
  12. factory.setUsername("guest");
  13. factory.setPassword("guest");
  14. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  15. factory.setVirtualHost("/");
  16. // 打开连接和创建频道,与发送端一样
  17. Connection connection = factory.newConnection();
  18. final Channel channel = connection.createChannel();
  19. channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME,
  20. BuiltinExchangeType.FANOUT);
  21. // 声明一个随机队列
  22. String queueName = channel.queueDeclare().getQueue();
  23. /*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/
  24. String[] routekeys = {"king", "mark", "james"};
  25. for (String routekey : routekeys) {
  26. channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME,
  27. routekey);
  28. }
  29. System.out.println(" [" + queueName + "] Waiting for messages:");
  30. // 创建队列消费者
  31. final Consumer consumerA = new DefaultConsumer(channel) {
  32. @Override
  33. public void handleDelivery(String consumerTag, Envelope envelope,
  34. AMQP.BasicProperties properties, byte[] body)
  35. throws IOException {
  36. String message = new String(body, "UTF-8");
  37. System.out.println("Received " + envelope.getRoutingKey() + "':'" + message + "'");
  38. }
  39. };
  40. channel.basicConsume(queueName, true, consumerA);
  41. }
  42. }

输出:

  1. [amq.gen-sWSOqxiXaQN1PZSrKaz59w] Waiting for messages:
  2. Received king':'Hello World_1'
  3. Received mark':'Hello World_2'
  4. Received james':'Hello World_3'

消费者2

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. *类说明:fanout消费者--绑定一个不存在的路由键
  6. */
  7. public class Consumer2 {
  8. public static void main(String[] argv) throws IOException, TimeoutException {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("zjj101");
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  14. factory.setVirtualHost("/");
  15. // 打开连接和创建频道,与发送端一样
  16. Connection connection = factory.newConnection();
  17. final Channel channel = connection.createChannel();
  18. channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME,
  19. BuiltinExchangeType.FANOUT);
  20. // 声明一个随机队列
  21. String queueName = channel.queueDeclare().getQueue();
  22. //设置一个不存在的路由键
  23. String routekey="xxx";
  24. channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);
  25. System.out.println(" [*] Waiting for messages......");
  26. // 创建队列消费者
  27. final Consumer consumerB = new DefaultConsumer(channel) {
  28. @Override
  29. public void handleDelivery(String consumerTag,
  30. Envelope envelope,
  31. AMQP.BasicProperties properties,
  32. byte[] body)
  33. throws IOException {
  34. String message = new String(body, "UTF-8");
  35. //记录日志到文件:
  36. System.out.println( "Received ["+ envelope.getRoutingKey()
  37. + "] "+message);
  38. }
  39. };
  40. channel.basicConsume(queueName, true, consumerB);
  41. }
  42. }

输出:

  1. [*] Waiting for messages......
  2. Received [king] Hello World_1
  3. Received [mark] Hello World_2
  4. Received [james] Hello World_3