代码实现整个消息投递过程
public class Producer {private static Logger log = LoggerFactory.getLogger(Producer.class);public static void main(String[] args) {// 步骤// 1. 创建连接工厂log.info("1. 创建连接工厂");ConnectionFactory connectionFactory = new ConnectionFactory();//配置相关配置项connectionFactory.setHost("112.74.175.76");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");//声明连接和通道Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionlog.info("2. 创建连接 Connection");connection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道Chanellog.info("3. 通过连接获取通道Chanel");channel = connection.createChannel();// 5. 准备消息log.info("5. 准备消息");String msg = "投递消息到 direct-message-exchange "+System.currentTimeMillis();// 6. 准备新的交换机log.info("6. 准备新的交换机");String exchangeName = "direct-message-exchange";// 7. 定义路由key// log.info("7. 定义路由key");// String routingKey = "message";// 8. 定义交换机类型log.info("8. 定义交换机类型");String exchangeType = "direct";//如果交换机不存在需要先声明交换机 1. 代码声明 2. web界面/*** @param 交换机名称* @param 交换机类型* @param 是否持久化*/channel.exchangeDeclare(exchangeName, exchangeType, true);//声明队列/*** @param 队列名称* @param 是否持久化* @param 是否独占队列(排他性,一般不设置为排他)* @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)* @param 附加参数map**/channel.queueDeclare("q5",true,false,false,null);channel.queueDeclare("q6",true,false,false,null);channel.queueDeclare("q7",true,false,false,null);//交换机绑定队列/*** @param 队列名称* @param 交换机名称* @param 路由key*/channel.queueBind("q5", exchangeName, "order");channel.queueBind("q6", exchangeName, "order");channel.queueBind("q7", exchangeName, "course");//发送消息/*** @param 交换机* @param 队列名称/路由key* @param 属性配置* @param 发送消息内容*/channel.basicPublish(exchangeName,"course",null,msg.getBytes());log.info("消息发送成功:{}",msg);} catch (Exception e) {log.info("消息发送异常");e.printStackTrace();} finally {log.info("关闭连接");// 7. 关闭通道if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}// 8. 关闭连接if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}
