1.将下面三个文件放入服务器中
1.png

2.先安装erlang,再安装socat,再安装rabbitmq

  1. rpm -ivh esl-erlang_23.0.3-1~centos~8_amd64.rpm
  2. rpm -ivh socat-1.7.3.3-2.el8.x86_64.rpm
  3. rpm -ivh rabbitmq-server-3.8.6-1.el8.noarch.rpm

出现warning::Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY警告。

解决办法:在rpm 语句后面加上 —force —nodeps就可以了。

3.启动web管理页面

  1. rabbitmq-plugins enable rabbitmq_management

使用http://ip:15672登录,默认用户为guest,密码为guest

你也可以创建一个用户:

  1. rabbitmqctl add_user admin 123456

然后使用账户登录管理页面:
2.png

如何在java程序中使用

接下来用java程序执行我们的hello,rabbitmq程序:

首先要导入rabbitmq的依赖:

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.9.0</version>
  5. </dependency>

然后在web管理页面创建一个虚拟主机:
3.png

然后创建一个用户与其绑定,权限也为admin权限:
4.png

绑定刚才创建的主机:
5.png

点击这个用户名:
6.png

然后点击刚才的虚拟主机,再点击set permission
7.png

可以看到,ems用户此时已经绑定ems虚拟主机

接下来,我们在java中创建一个生产者的测试类:

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import org.junit.jupiter.api.Test;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Provider {
  8. /**
  9. * @author: zhangyaomin
  10. * @Date: 2020/8/17 10:29
  11. * @Description: RabbitMQ第一种方式:直连,生产消息
  12. */
  13. @Test
  14. public void testSendMessage() throws IOException, TimeoutException {
  15. //1.创建连接mq的连接工厂
  16. ConnectionFactory factory = new ConnectionFactory();
  17. //2.设置连接mq的主机及其信息
  18. //ip
  19. factory.setHost("xx.xxx.xxx.xxx");
  20. //端口
  21. factory.setPort(5672);
  22. //虚拟主机名
  23. factory.setVirtualHost("/ems");
  24. //用户
  25. factory.setUsername("ems");
  26. //用户密码
  27. factory.setPassword("123456");
  28. }
  29. }

由于我们这次测试需要将通道与消息队列绑定,我们首先需要创建一个消息队列,也可以用代码创建:

  1. //3.获取连接对象
  2. Connection connection = factory.newConnection();
  3. //4.获取连接通道对象
  4. Channel channel = connection.createChannel();
  5. //5.通道绑定消息队列
  6. /*
  7. * 参数1:队列的名称,不存在则自动创建
  8. * 参数2:队列是否持久化
  9. * 参数3:是否独占队列(当前队列只允许当前连接可用)
  10. * 参数4:是否在消费完成自动删除
  11. * 参数5:额外参数
  12. */
  13. channel.queueDeclare("hello",false,false,false,null);

然后我们创建发布消息:

  1. //6.创建发布消息
  2. /*
  3. * 参数1:交换机名称,此处是直连,所以没有交换机
  4. * 参数2:队列名称
  5. * 参数3:传递消息的额外设置
  6. * 参数4:消息内容
  7. */
  8. channel.basicPublish("","hello",null,"hello,rabbitmq".getBytes());
  9. //7.关闭通道对象与连接对象
  10. channel.close();
  11. connection.close();

执行该类,然后查看管理页面:
8.png

可以看到我们成功的创建了一个名叫hello的队列,生产者的任务就完成了

接下来我们来模拟消费者:

  1. package com.alex.blog.rabbitmq.helloworld;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Customer {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //1.创建连接mq的连接工厂
  8. ConnectionFactory factory = new ConnectionFactory();
  9. //2.设置连接mq的主机及其信息
  10. //ip
  11. factory.setHost("xx.xxx.xxx.xxx");
  12. //端口
  13. factory.setPort(5672);
  14. //虚拟主机名
  15. factory.setVirtualHost("/ems");
  16. //用户
  17. factory.setUsername("ems");
  18. //用户密码
  19. factory.setPassword("123456");
  20. //3.获取连接对象
  21. Connection connection = factory.newConnection();
  22. //4.获取连接通道对象
  23. Channel channel = connection.createChannel();
  24. //5.通道绑定消息队列
  25. /*
  26. * 参数1:队列的名称,不存在则自动创建
  27. * 参数2:队列是否持久化
  28. * 参数3:是否独占队列(当前队列只允许当前连接可用)
  29. * 参数4:是否在消费完成自动删除
  30. * 参数5:额外参数
  31. */
  32. channel.queueDeclare("hello",false,false,false,null);
  33. //6.消费消息
  34. /*
  35. * 参数1:消费哪个队列里的消息
  36. * 参数2:开启消息自动确认机制
  37. * 参数3:消费消息时的回调接口
  38. */
  39. channel.basicConsume("hello",true,new DefaultConsumer(channel){
  40. @Override
  41. //用来处理消息时的一个回调
  42. /*
  43. * 参数1:
  44. * 参数2:
  45. * 参数3:
  46. * 参数4:消息队列中取出的消息
  47. */
  48. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  49. System.out.println("消息队列中的消息:"+new String(body));
  50. }
  51. });
  52. //7.关闭通道对象与连接对象,不关闭的话会一直监听队列
  53. //channel.close();
  54. //connection.close();
  55. }
  56. }

运行结果:
9.png