代码实现整个消息投递过程

    1. public class Producer {
    2. private static Logger log = LoggerFactory.getLogger(Producer.class);
    3. public static void main(String[] args) {
    4. // 步骤
    5. // 1. 创建连接工厂
    6. log.info("1. 创建连接工厂");
    7. ConnectionFactory connectionFactory = new ConnectionFactory();
    8. //配置相关配置项
    9. connectionFactory.setHost("112.74.175.76");
    10. connectionFactory.setPort(5672);
    11. connectionFactory.setUsername("admin");
    12. connectionFactory.setPassword("admin");
    13. connectionFactory.setVirtualHost("/");
    14. //声明连接和通道
    15. Connection connection = null;
    16. Channel channel = null;
    17. try {
    18. // 2. 创建连接 Connection
    19. log.info("2. 创建连接 Connection");
    20. connection = connectionFactory.newConnection("生产者");
    21. // 3. 通过连接获取通道Chanel
    22. log.info("3. 通过连接获取通道Chanel");
    23. channel = connection.createChannel();
    24. // 5. 准备消息
    25. log.info("5. 准备消息");
    26. String msg = "投递消息到 direct-message-exchange "+System.currentTimeMillis();
    27. // 6. 准备新的交换机
    28. log.info("6. 准备新的交换机");
    29. String exchangeName = "direct-message-exchange";
    30. // 7. 定义路由key
    31. // log.info("7. 定义路由key");
    32. // String routingKey = "message";
    33. // 8. 定义交换机类型
    34. log.info("8. 定义交换机类型");
    35. String exchangeType = "direct";
    36. //如果交换机不存在需要先声明交换机 1. 代码声明 2. web界面
    37. /**
    38. * @param 交换机名称
    39. * @param 交换机类型
    40. * @param 是否持久化
    41. */
    42. channel.exchangeDeclare(exchangeName, exchangeType, true);
    43. //声明队列
    44. /**
    45. * @param 队列名称
    46. * @param 是否持久化
    47. * @param 是否独占队列(排他性,一般不设置为排他)
    48. * @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
    49. * @param 附加参数map
    50. *
    51. */
    52. channel.queueDeclare("q5",true,false,false,null);
    53. channel.queueDeclare("q6",true,false,false,null);
    54. channel.queueDeclare("q7",true,false,false,null);
    55. //交换机绑定队列
    56. /**
    57. * @param 队列名称
    58. * @param 交换机名称
    59. * @param 路由key
    60. */
    61. channel.queueBind("q5", exchangeName, "order");
    62. channel.queueBind("q6", exchangeName, "order");
    63. channel.queueBind("q7", exchangeName, "course");
    64. //发送消息
    65. /**
    66. * @param 交换机
    67. * @param 队列名称/路由key
    68. * @param 属性配置
    69. * @param 发送消息内容
    70. */
    71. channel.basicPublish(exchangeName,"course",null,msg.getBytes());
    72. log.info("消息发送成功:{}",msg);
    73. } catch (Exception e) {
    74. log.info("消息发送异常");
    75. e.printStackTrace();
    76. } finally {
    77. log.info("关闭连接");
    78. // 7. 关闭通道
    79. if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
    80. try {
    81. channel.close();
    82. } catch (IOException | TimeoutException e) {
    83. e.printStackTrace();
    84. }
    85. }
    86. // 8. 关闭连接
    87. if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
    88. try {
    89. connection.close();
    90. } catch (IOException e) {
    91. e.printStackTrace();
    92. }
    93. }
    94. }
    95. }
    96. }