交换机
在RabbitMQ的可视化界面添加交换机。

这里type选fanout, 指的是,会将消息发送到所有绑定到了的queue上。具体其他的type参考RabbitMQ中exchange的不同类型
消费者
消费者1:
注意要将queue绑定到exchange上
package com.lang.rabbitmq.pubsub;import com.lang.rabbitmq.RabbitConstant;import com.lang.rabbitmq.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);// 绑定queue到exchange上// 第一个参数:Queue的名称// 第二个参数:路由器的名称// 第三个参数:路由key, 因为这里我们采用的是发布订阅模式,将消息传递到每一个queue里去,所以用不到keychannel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到消息:" + new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}
消费者2:
与消费者1类似,不同的是queue不同。
package com.lang.rabbitmq.pubsub;import com.lang.rabbitmq.RabbitConstant;import com.lang.rabbitmq.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);// 绑定queue到exchange上// 第一个参数:Queue的名称// 第二个参数:路由器的名称// 第三个参数:路由key, 因为这里我们采用的是发布订阅模式,将消息传递到每一个queue里去,所以用不到keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到消息:" + new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}
生产者
消息发送给exchange,而不是发送给Queue。
package com.lang.rabbitmq.pubsub;import com.lang.rabbitmq.RabbitConstant;import com.lang.rabbitmq.RabbitUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();String input = new Scanner(System.in).next();// 第一个参数:交换机的名字;// 第二个参数:队列的名字,因为我们这里是直接发给交换机,所以不用队列名,由交换机去发给所有绑定的队列。channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());channel.close();connection.close();}}
