windows环境搭建:
1.下载并安装erlang,下载地址:http://www.erlang.org/download
2.配置erlang环境变量信息
新增环境变量ERLANG_HOME=erlang的安装地址
将%ERLANG_HOME%\bin加入到path中
3.下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html
注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。
RabitMQ管理平台中心
RabbitMQ 管理平台地址 http://127.0.0.1:15672
默认账号:guest/guest 用户可以自己创建新的账号
Virtual Hosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?
RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每
个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互
隔离的。exchange、queue、message不能互通。
默认的端口15672:rabbitmq管理平台端口号
默认的端口5672: rabbitmq消息中间内部通讯的端口
默认的端口号25672 rabbitmq集群的端口号
linux环境搭建(Centos7):
yum install epel-release
yum install rabbitmq-server
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
添加用户
rabbitmqctl add_user wn wn123 用户名 密码
把用户设置为管理员
rabbitmqctl set_user_tags wn administrator
给用户赋予所有权限
rabbitmqctl set_permissions -p / wn ".*" ".*" ".*"
1、对比:
ActiveMQ遵循JMS规范,但是迭代速度很慢,不怎么维护了,并且高并发性能不怎么好,有可能会丢失消息;
RabbitMQ是Erlang语言编写的,继承了Erlang语言天生的为并发而生的性能,稳定性与安全性也有保障;但是不支持动态扩展;
Kafaka依赖Zookeeper,可动态扩展节点,高性能高吞吐量、无限扩容、消息可以指定追溯;但是有着严格的顺序机制,不支持消息优先级;
2、RabbitMQ支持多virtual host,不同团队可共用一个RabbitMQ,只要建立不同的virtual host就好了,可以进行相互隔离;互不影响;
3、普通队列与公平队列(点对点)
普通:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
public class RabitMQConnection {
public static Connection getConnection() throws IOException, TimeoutException {
// 1.创建我们的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置我们的连接地址
connectionFactory.setHost("127.0.0.1");
// 3.设置我们的端口号
connectionFactory.setPort(5672);
// 4.设置账号和密码
connectionFactory.setUsername("meite");
connectionFactory.setPassword("meite");
// 5.设置VirtualHost
connectionFactory.setVirtualHost("/meite_rabbitmq");
return connectionFactory.newConnection();
}
}
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "test_yushengjun110";
System.out.println("生产者发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
newConnection.close();
}
}
public class Customer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("002");
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.获取通道
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
公平:work queue
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 1; i <= 50; i++) {
String msg = "test_yushengjun" + i;
System.out.println("生产者发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
channel.close();
newConnection.close();
}
}
public class Customer1 {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("001");
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.获取通道
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
try {
Thread.sleep(1000);
} catch (Exception e) {
} finally {
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 3.监听队列
//设置应答模式,如果为true情况下,表示自动应答模式,false表示手动应答
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
保证消息不丢失:
通过confirm机制确认发送消息成功(向mqserver发送消息,如果接收到采用ack机制通知mqclient说明发送成功)
public class Producer {
private static final String QUEUE_NAME = "mayikt";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
System.out.println("生产者启动成功..");
// 1.创建我们的连接
Connection connection = RabitMQConnection.getConnection();
// 2.创建我们通道
Channel channel = connection.createChannel();
// for (int i = 0; i < 10; i++) {
// 开启生产确认消息投递机制
channel.confirmSelect();
int i = 1;
String msg = "每特教育第六期突破3万月薪" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (channel.waitForConfirms()) {
System.out.println("生产者发送消息成功:" + msg);
}
channel.close();
connection.close();
}
}
消费者采用ack机制通知给mqserver才会给消费者发送下一条消息(消费者启动的时候会自动去mqserver拉去消息,如果后边有新消息则mqserver会会将消息自动推送给消费者)
public class Consumer {
private static final String QUEUE_NAME = "mayikt";
// 业务操作等待时间
private static int time = 1000;
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(">>>消费者02启动:" + time);
// 1.创建我们的连接
Connection connection = RabitMQConnection.getConnection();
// 2.创建我们通道
final Channel channel = connection.createChannel();
// MQ每次只会给消费者发送一条消息,必须返回ack之后才会继续发送消息给消费者
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费消息msg:" + msg);
try {
Thread.sleep(time);
} catch (Exception e) {
}
// 手动发送消息告诉给mq服务器端 从队列删除该消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.创建我们的监听的消息 auto Ack 默认自动签收 必须手动ack
channel.basicConsume(QUEUE_NAME, false
, defaultConsumer);
}
}
至于mq服务端怎样做保证消息的安全:
在我们创建队列和交换机的时候都有一个属性Durability,值可以为Durable和Tmep;
如果当我们的mqserver刚要做消息磁盘持久化的时候,服务器挂了,那我们可以在发送消息前搞个日志表记录下,然后消费者消费也搞个日志表,做个定时同步补发就好;