1.步骤

  1. 创建连接工厂
  2. 获取连接connection
  3. 通过连接获取通道
  4. 通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
  5. 准备消息内容
  6. 发送消息给队列
  7. 关闭连接通道

    2.生产者代码

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class Producer {
  5. private static final String QUEUE_NAME = "LHM";
  6. public static void main(String[] args) {
  7. //所有的中间件的技术都是基于 tcp/ip 协议基础上构建的新型协议规范 不过rabbitmq 遵循的amqp
  8. // ip port
  9. //1.创建连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("39.102.67.107");
  12. factory.setPort(5672);
  13. factory.setUsername("admin");
  14. factory.setPassword("123456");
  15. factory.setVirtualHost("/");
  16. //2.获取连接connection
  17. Connection connection = null;
  18. Channel channel = null;
  19. try {
  20. connection = factory.newConnection("生产者");
  21. //3.通过连接获取通道
  22. channel = connection.createChannel();
  23. //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
  24. /**
  25. * @param1:队列的名称
  26. * @param2:是否要持久化 durable = false 所谓的持久化消息是否存盘,如果false,非持久化 true 是持久化
  27. * @param3:排他性 是否是独占队列
  28. * @param4:是否自动删除,随着最后一个消费者发送消息完毕是否把队列删除
  29. * @param5:携带一些附属参数
  30. */
  31. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  32. //5.准备消息内容
  33. String message = "hello";
  34. //6.发送消息给队列
  35. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. } finally {
  39. //7.关闭连接通道
  40. if (channel != null && channel.isOpen()){
  41. try {
  42. channel.close();
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. if (connection != null && connection.isOpen()){
  48. try {
  49. connection.close();
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }
  55. }
  56. }

3.消费者代码

  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. public class Consumer {
  4. private static final String QUEUE_NAME = "LHM";
  5. public static void main(String[] args) {
  6. //所有的中间件的技术都是基于 tcp/ip 协议基础上构建的新型协议规范 不过rabbitmq 遵循的amqp
  7. // ip port
  8. //1.创建连接工厂
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("39.102.67.107");
  11. factory.setPort(5672);
  12. factory.setUsername("admin");
  13. factory.setPassword("123456");
  14. factory.setVirtualHost("/");
  15. //2.获取连接connection
  16. Connection connection = null;
  17. Channel channel = null;
  18. try {
  19. connection = factory.newConnection("生产者");
  20. //3.通过连接获取通道
  21. channel = connection.createChannel();
  22. //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
  23. channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> {
  24. System.out.println("收到的消息为:" + new String(message.getBody(), StandardCharsets.UTF_8));
  25. }, consumerTag -> {
  26. System.out.println("消息接受失败了");
  27. });
  28. System.out.println("开启接受消息");
  29. System.in.read();
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally {
  33. //7.关闭连接通道
  34. if (channel != null && channel.isOpen()){
  35. try {
  36. channel.close();
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. if (connection != null && connection.isOpen()){
  42. try {
  43. connection.close();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. }
  50. }