


1.添加交换机weather_topic
2.发布者
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;/*** 发布者*/public class WeatherBureau {public static void main(String[] args) throws Exception {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//第一个参数交换机名字 第二个参数作为 消息的routing keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), null, me.getValue().getBytes());}channel.close();connection.close();}}
3.消费者BiaDu
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;/*** 消费者*/public class BiaDu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");// channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
4.消费者Sina
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;/*** 消费者*/public class Sina {public static void main(String[] args) throws IOException {//获取TCP长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//指定队列与交换机以及routing key之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
5.运行结果:

