交换机

在RabbitMQ的可视化界面添加交换机。

image.png
这里type选fanout, 指的是,会将消息发送到所有绑定到了的queue上。具体其他的type参考RabbitMQ中exchange的不同类型

消费者

消费者1:
注意要将queue绑定到exchange上

  1. package com.lang.rabbitmq.pubsub;
  2. import com.lang.rabbitmq.RabbitConstant;
  3. import com.lang.rabbitmq.RabbitUtils;
  4. import com.rabbitmq.client.*;
  5. import java.io.IOException;
  6. public class Sina {
  7. public static void main(String[] args) throws IOException {
  8. Connection connection = RabbitUtils.getConnection();
  9. Channel channel = connection.createChannel();
  10. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
  11. // 绑定queue到exchange上
  12. // 第一个参数:Queue的名称
  13. // 第二个参数:路由器的名称
  14. // 第三个参数:路由key, 因为这里我们采用的是发布订阅模式,将消息传递到每一个queue里去,所以用不到key
  15. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
  16. channel.basicQos(1);
  17. channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("新浪收到消息:" + new String(body));
  21. channel.basicAck(envelope.getDeliveryTag(), false);
  22. }
  23. });
  24. }
  25. }

消费者2:

与消费者1类似,不同的是queue不同。

  1. package com.lang.rabbitmq.pubsub;
  2. import com.lang.rabbitmq.RabbitConstant;
  3. import com.lang.rabbitmq.RabbitUtils;
  4. import com.rabbitmq.client.*;
  5. import java.io.IOException;
  6. public class Baidu {
  7. public static void main(String[] args) throws IOException {
  8. Connection connection = RabbitUtils.getConnection();
  9. Channel channel = connection.createChannel();
  10. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
  11. // 绑定queue到exchange上
  12. // 第一个参数:Queue的名称
  13. // 第二个参数:路由器的名称
  14. // 第三个参数:路由key, 因为这里我们采用的是发布订阅模式,将消息传递到每一个queue里去,所以用不到key
  15. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
  16. channel.basicQos(1);
  17. channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("百度收到消息:" + new String(body));
  21. channel.basicAck(envelope.getDeliveryTag(), false);
  22. }
  23. });
  24. }
  25. }

生产者

消息发送给exchange,而不是发送给Queue。

  1. package com.lang.rabbitmq.pubsub;
  2. import com.lang.rabbitmq.RabbitConstant;
  3. import com.lang.rabbitmq.RabbitUtils;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import java.io.IOException;
  7. import java.util.Scanner;
  8. import java.util.concurrent.TimeoutException;
  9. public class WeatherBureau {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. Connection connection = RabbitUtils.getConnection();
  12. Channel channel = connection.createChannel();
  13. String input = new Scanner(System.in).next();
  14. // 第一个参数:交换机的名字;
  15. // 第二个参数:队列的名字,因为我们这里是直接发给交换机,所以不用队列名,由交换机去发给所有绑定的队列。
  16. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());
  17. channel.close();
  18. connection.close();
  19. }
  20. }