

1.还是以通配符模式的配置继续来进行代码演示
2.发布者
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.Map;import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//开启confirm监听模式channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long l, boolean b) throws IOException {//第二个参数代表接收的数据是否为批量接收,一般我们用不到。System.out.println("消息已被Broker接收,Tag:" + l);}public void handleNack(long l, boolean b) throws IOException {System.out.println("消息已被Broker拒收,Tag:" + l);}});channel.addReturnListener(new ReturnCallback() {public void handle(Return r) {System.err.println("===========================");System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey());System.err.println("Return主题:" + new String(r.getBody()));System.err.println("===========================");}});Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二个参数相当于数据筛选的条件//第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), true, null, me.getValue().getBytes());}//如果关闭则无法进行监听,因此此处不需要关闭/*channel.close();connection.close();*/}}
3.消费者Baidu
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");//channel.queueUnbind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");//*.hebei.*.*channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}
4.消费者Sina
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("腾讯天气收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}}
5.运行结果
