交换机
在RabbitMQ的可视化界面添加交换机。
这里type选fanout, 指的是,会将消息发送到所有绑定到了的queue上。具体其他的type参考RabbitMQ中exchange的不同类型
消费者
消费者1:
注意要将queue绑定到exchange上
package com.lang.rabbitmq.pubsub;
import com.lang.rabbitmq.RabbitConstant;
import com.lang.rabbitmq.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
// 绑定queue到exchange上
// 第一个参数:Queue的名称
// 第二个参数:路由器的名称
// 第三个参数:路由key, 因为这里我们采用的是发布订阅模式,将消息传递到每一个queue里去,所以用不到key
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪收到消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
消费者2:
与消费者1类似,不同的是queue不同。
package com.lang.rabbitmq.pubsub;
import com.lang.rabbitmq.RabbitConstant;
import com.lang.rabbitmq.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
// 绑定queue到exchange上
// 第一个参数:Queue的名称
// 第二个参数:路由器的名称
// 第三个参数:路由key, 因为这里我们采用的是发布订阅模式,将消息传递到每一个queue里去,所以用不到key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度收到消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
生产者
消息发送给exchange,而不是发送给Queue。
package com.lang.rabbitmq.pubsub;
import com.lang.rabbitmq.RabbitConstant;
import com.lang.rabbitmq.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
String input = new Scanner(System.in).next();
// 第一个参数:交换机的名字;
// 第二个参数:队列的名字,因为我们这里是直接发给交换机,所以不用队列名,由交换机去发给所有绑定的队列。
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());
channel.close();
connection.close();
}
}