1.将下面三个文件放入服务器中
2.先安装erlang,再安装socat,再安装rabbitmq
rpm -ivh esl-erlang_23.0.3-1~centos~8_amd64.rpm
rpm -ivh socat-1.7.3.3-2.el8.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.6-1.el8.noarch.rpm
出现warning::Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY警告。
解决办法:在rpm 语句后面加上 —force —nodeps就可以了。
3.启动web管理页面
rabbitmq-plugins enable rabbitmq_management
使用http://ip:15672登录,默认用户为guest,密码为guest
你也可以创建一个用户:
rabbitmqctl add_user admin 123456
然后使用账户登录管理页面:
如何在java程序中使用
接下来用java程序执行我们的hello,rabbitmq程序:
首先要导入rabbitmq的依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
然后在web管理页面创建一个虚拟主机:
然后创建一个用户与其绑定,权限也为admin权限:
绑定刚才创建的主机:
点击这个用户名:
然后点击刚才的虚拟主机,再点击set permission
可以看到,ems用户此时已经绑定ems虚拟主机
接下来,我们在java中创建一个生产者的测试类:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
/**
* @author: zhangyaomin
* @Date: 2020/8/17 10:29
* @Description: RabbitMQ第一种方式:直连,生产消息
*/
@Test
public void testSendMessage() throws IOException, TimeoutException {
//1.创建连接mq的连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置连接mq的主机及其信息
//ip
factory.setHost("xx.xxx.xxx.xxx");
//端口
factory.setPort(5672);
//虚拟主机名
factory.setVirtualHost("/ems");
//用户
factory.setUsername("ems");
//用户密码
factory.setPassword("123456");
}
}
由于我们这次测试需要将通道与消息队列绑定,我们首先需要创建一个消息队列,也可以用代码创建:
//3.获取连接对象
Connection connection = factory.newConnection();
//4.获取连接通道对象
Channel channel = connection.createChannel();
//5.通道绑定消息队列
/*
* 参数1:队列的名称,不存在则自动创建
* 参数2:队列是否持久化
* 参数3:是否独占队列(当前队列只允许当前连接可用)
* 参数4:是否在消费完成自动删除
* 参数5:额外参数
*/
channel.queueDeclare("hello",false,false,false,null);
然后我们创建发布消息:
//6.创建发布消息
/*
* 参数1:交换机名称,此处是直连,所以没有交换机
* 参数2:队列名称
* 参数3:传递消息的额外设置
* 参数4:消息内容
*/
channel.basicPublish("","hello",null,"hello,rabbitmq".getBytes());
//7.关闭通道对象与连接对象
channel.close();
connection.close();
执行该类,然后查看管理页面:
可以看到我们成功的创建了一个名叫hello的队列,生产者的任务就完成了
接下来我们来模拟消费者:
package com.alex.blog.rabbitmq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接mq的连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置连接mq的主机及其信息
//ip
factory.setHost("xx.xxx.xxx.xxx");
//端口
factory.setPort(5672);
//虚拟主机名
factory.setVirtualHost("/ems");
//用户
factory.setUsername("ems");
//用户密码
factory.setPassword("123456");
//3.获取连接对象
Connection connection = factory.newConnection();
//4.获取连接通道对象
Channel channel = connection.createChannel();
//5.通道绑定消息队列
/*
* 参数1:队列的名称,不存在则自动创建
* 参数2:队列是否持久化
* 参数3:是否独占队列(当前队列只允许当前连接可用)
* 参数4:是否在消费完成自动删除
* 参数5:额外参数
*/
channel.queueDeclare("hello",false,false,false,null);
//6.消费消息
/*
* 参数1:消费哪个队列里的消息
* 参数2:开启消息自动确认机制
* 参数3:消费消息时的回调接口
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
//用来处理消息时的一个回调
/*
* 参数1:
* 参数2:
* 参数3:
* 参数4:消息队列中取出的消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息队列中的消息:"+new String(body));
}
});
//7.关闭通道对象与连接对象,不关闭的话会一直监听队列
//channel.close();
//connection.close();
}
}
运行结果: