安装

docker

  1. docker pull rabbitmq
  2. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /data/rabbimq:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
  3. docker exec -it rabbit rabbitmq-plugins enable rabbitmq_management

-d 后台运行容器;
—name 指定容器名;
-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-v 映射目录或文件;
—hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

延迟队列

  1. package com.huiyou.washing;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * Created by hidden on 2017/2/7.
  11. */
  12. public class DelayTest {
  13. public static final String ip = "192.168.31.232";
  14. public static final int port = 5672;
  15. public static final String username = "admin";
  16. public static final String password = "rabbit_0921";
  17. public static final String queueName = "queue.ttl.test";
  18. public static final String exchangeName = "exchange.ttl.test";
  19. public static final String routingKey = "ttl";
  20. public static final Boolean durable = true;
  21. public static final Boolean exclusive = false;
  22. public static final Boolean autoDelete = false;
  23. public static void main(String[] args) {
  24. try {
  25. Connection connection = getConnection();
  26. Channel channel = connection.createChannel();
  27. channel.exchangeDeclare("dlx.exchange", "direct");
  28. channel.queueDeclare("dlx.queue", true, false, false, null);
  29. channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingKey");
  30. //创建测试超时的Exchange及Queue
  31. channel.exchangeDeclare("delay.exchange", "direct");
  32. Map<String, Object> arguments = new HashMap<>();
  33. //过期时间10s
  34. arguments.put("x-message-ttl", 10000);
  35. //绑定DLX
  36. arguments.put("x-dead-letter-exchange", "dlx.exchange");
  37. //绑定发送到DLX的RoutingKey
  38. arguments.put("x-dead-letter-routing-key", "dlx.routingKey");
  39. channel.queueDeclare("delay.queue", true, false, false, arguments);
  40. channel.queueBind("delay.queue", "delay.exchange", "delay.routingKey");
  41. //发布一条消息
  42. channel.basicPublish("delay.exchange", "delay.routingKey", null, "该消息将在10s后发送到延迟队列".getBytes());
  43. channel.close();
  44. connection.close();
  45. } catch (IOException e) {
  46. e.printStackTrace();
  47. } catch (TimeoutException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. private static Connection getConnection() throws IOException, TimeoutException {
  52. ConnectionFactory factory = new ConnectionFactory();
  53. factory.setHost(ip);
  54. factory.setPort(port);
  55. factory.setUsername(username);
  56. factory.setPassword(password);
  57. return factory.newConnection();
  58. }
  59. }