image.png

    image.png
    image.png
    1.添加交换机weather_topic
    image.png
    2.发布者

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import java.util.Iterator;
    6. import java.util.LinkedHashMap;
    7. import java.util.Map;
    8. /**
    9. * 发布者
    10. */
    11. public class WeatherBureau {
    12. public static void main(String[] args) throws Exception {
    13. Map area = new LinkedHashMap<String, String>();
    14. area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
    15. area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
    16. area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
    17. area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
    18. area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
    19. area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
    20. area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
    21. area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
    22. Connection connection = RabbitUtils.getConnection();
    23. Channel channel = connection.createChannel();
    24. Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
    25. while (itr.hasNext()) {
    26. Map.Entry<String, String> me = itr.next();
    27. //第一个参数交换机名字 第二个参数作为 消息的routing key
    28. channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), null, me.getValue().getBytes());
    29. }
    30. channel.close();
    31. connection.close();
    32. }
    33. }

    3.消费者BiaDu

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * 消费者
    7. */
    8. public class BiaDu {
    9. public static void main(String[] args) throws IOException {
    10. Connection connection = RabbitUtils.getConnection();
    11. final Channel channel = connection.createChannel();
    12. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
    13. //queueBind用于将队列与交换机绑定
    14. //参数1:队列名 参数2:交互机名 参数三:路由key
    15. channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
    16. // channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
    17. channel.basicQos(1);
    18. channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
    19. @Override
    20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    21. System.out.println("百度天气收到气象信息:" + new String(body));
    22. channel.basicAck(envelope.getDeliveryTag() , false);
    23. }
    24. });
    25. }
    26. }

    4.消费者Sina

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * 消费者
    7. */
    8. public class Sina {
    9. public static void main(String[] args) throws IOException {
    10. //获取TCP长连接
    11. Connection connection = RabbitUtils.getConnection();
    12. //获取虚拟连接
    13. final Channel channel = connection.createChannel();
    14. //声明队列信息
    15. channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
    16. //指定队列与交换机以及routing key之间的关系
    17. channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
    18. channel.basicQos(1);
    19. channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
    20. @Override
    21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    22. System.out.println("新浪天气收到气象信息:" + new String(body));
    23. channel.basicAck(envelope.getDeliveryTag() , false);
    24. }
    25. });
    26. }
    27. }

    5.运行结果:
    image.png
    image.png