消息队列

  1. MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message
  2. 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游
  3. “逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

消息队列功能

  1. 1.流量消峰
  2. 在并发场景下,当我们的后台系统抗压能力不足以处理流量时,可以引入消息队列进行流量消峰,通过
  3. 限流保证后台系统的稳定运作。
  4. 2.应用解耦
  5. 在分布式应用中各个模块之间的相互调用存在诸多问题,可能因为网络不可达造成整条调用链路失败,
  6. 引入消息中间件可以解耦上游应用对下游应用的依赖,生产者只需要把消息投递到队列中,后面由消费者
  7. 进行消费,不会因为生产者出现故障而导致调用链路故障。
  8. 3.异步处理
  9. 将非重要业务消息进行异步处理,节省时间。以下单为例,我们购买东西付款成功后,有两个业务,一是
  10. 同步到支付宝短信通知已消费XX元,二是下单成功后淘宝账号加积分,对这两个消息进行异步处理可以节省
  11. 所消耗的时间。

MQ产品

  1. 1.ActiveMQ
  2. 优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据。
  3. 2.Kafka
  4. 大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,
  5. 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥
  6. 着举足轻重的作用。目前已经被 LinkedInUber, Twitter, Netflix 等大公司所采纳。
  7. 优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非
  8. 常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采
  9. Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方
  10. Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:
  11. 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
  12. 缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消
  13. 息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,
  14. 但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
  15. 3.RocketMQ
  16. RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一
  17. 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场
  18. 景。
  19. 优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分
  20. 布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅
  21. 读源码,定制自己公司的 MQ
  22. 缺点:支持的客户端语言不多,目前是 java c++,其中 c++不成熟;社区活跃度一般,没有在 MQ
  23. 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
  24. 4.RabbitMQ
  25. 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最
  26. 主流的消息中间件之一。
  27. 优点:由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易
  28. 用、跨平台、支持多种语言如:PythonRuby、.NETJavaJMSCPHPActionScriptXMPPSTOMP
  29. 等,支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。
  30. 缺点:商业版需要收费,学习成本较高。

四大核心概念

  1. 1.生产者
  2. 产生数据发送消息的程序是生产者。
  3. 2.交换机
  4. 交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息
  5. 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推
  6. 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
  7. 3.队列
  8. 队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存
  9. 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可
  10. 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
  11. 4.消费者
  12. 消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费。

专业名词

  1. Broker
  2. 接收和分发消息的应用,RabbitMQ Server就是Message Broker(包括ExchangeQueue)。
  3. Virtual host
  4. 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似
  5. 于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,
  6. 可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue
  7. Connection
  8. publisher/consumerbroker之间的TCP连接。
  9. Channel
  10. 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP
  11. Connection的开销将是巨大的,效率也较低。Channel是在 connection 内部建立的逻辑连接,如果应用程
  12. 序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客
  13. 户端和 message broker 识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的
  14. Connection极大减少了操作系统建立TCP connection的开销。
  15. Exchange
  16. message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。
  17. 常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  18. Queue
  19. 消息最终被送到这里等待consumer取走。
  20. Binding
  21. exchangequeue之间的虚拟连接,binding 中可以包含routing keyBinding信息被保
  22. 存到exchange中的查询表中,用于 message 的分发依据。

安装Rabbit MQ

  1. rpm -ivh erlang-21.3-1.el7.x86_64.rpm
  2. yum install socat -y
  3. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
  4. # 安装Web监控台插件
  5. rabbitmq-plugins enable rabbitmq_management

新建管理员用户

  1. 创建账号
  2. rabbitmqctl add_user admin 123
  3. 设置用户角色
  4. rabbitmqctl set_user_tags admin administrator
  5. 设置用户权限
  6. rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  7. 用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限当前用户和角色
  8. rabbitmqctl list_users

RabbitMQ Server 管理命令

  1. 添加开机自启动RabbitMQ服务
  2. chkconfig rabbitmq-server on
  3. 启动服务
  4. /sbin/service rabbitmq-server start
  5. 查看服务状态
  6. /sbin/service rabbitmq-server status
  7. 停止服务(选择执行)
  8. /sbin/service rabbitmq-server stop

Java客户端案例

依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.8.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>commons-io</groupId>
  8. <artifactId>commons-io</artifactId>
  9. <version>2.6</version>
  10. </dependency>

Provider

  1. public class Provider {
  2. //队列名称
  3. public static final String QUEUE_NAME = "hello";
  4. //发送消息到队列中
  5. public static void sendMS() {
  6. //创建一个连接工厂
  7. ConnectionFactory factory = new ConnectionFactory();
  8. //设置工厂IP 连接RabbitMQ的队列
  9. factory.setHost("47.172.193.131");
  10. factory.setPort(5672);
  11. factory.setUsername("admin");
  12. factory.setPassword("123");
  13. factory.setVirtualHost("/");
  14. Connection connection = null;
  15. Channel channel = null;
  16. try {
  17. //创建连接
  18. connection = factory.newConnection();
  19. //获取信道
  20. channel = connection.createChannel();
  21. /* 创建队列 queueDeclare
  22. * 1.队列名称
  23. * 2.队列中的消息是否持久化到磁盘中,默认在内存中
  24. * 3.该队列是否能被多个消费者共享
  25. * 4.最后一个消费者端口连接后是否自动删除队列
  26. * 5.其他参数[延迟队列,死信队列]
  27. */
  28. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  29. //发送消息
  30. String message = "hello world";
  31. /* 发送消息 basicPublish
  32. * 1.发送到那个交换机,空字符串表示默认交换机
  33. * 2.路由key值,可以是队列名称
  34. * 3.其他参数信息
  35. * 4.发送消息的消息体
  36. */
  37. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  38. } catch (IOException | TimeoutException e) {
  39. e.printStackTrace();
  40. } finally {
  41. if (channel != null) {
  42. try {
  43. channel.close();
  44. } catch (IOException | TimeoutException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. if (connection != null) {
  49. try {
  50. connection.close();
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. }
  57. }

Consumer

  1. public class Consumer {
  2. //队列名称
  3. public static final String QUEUE_NAME = "hello";
  4. //发送消息到队列中
  5. public static void getMS() {
  6. //创建一个连接工厂
  7. ConnectionFactory factory = new ConnectionFactory();
  8. //设置工厂IP 连接RabbitMQ的队列
  9. factory.setHost("47.172.193.131");
  10. factory.setPort(5672);
  11. factory.setUsername("admin");
  12. factory.setPassword("123");
  13. factory.setVirtualHost("/");
  14. Connection connection = null;
  15. Channel channel = null;
  16. try {
  17. connection = factory.newConnection();
  18. channel = connection.createChannel();
  19. /* 接收消息[消费者要处于监听的状态才会执行回调函数] basicConsume
  20. * 1.消费那个队列
  21. * 2.消费成功后是否要自动应答
  22. * 3.消费者未成功消费的回调
  23. * 4,消费者取消消费的回调
  24. */
  25. //消息消费成功回调函数
  26. DeliverCallback deliverCallback = (consumerTag, message) -> {
  27. System.out.println(new String(message.getBody()));
  28. };
  29. //取消消息时触发的回调函数
  30. CancelCallback cancelCallback = consumerTag -> {
  31. System.out.println("消息消费错误");
  32. };
  33. //channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
  34. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  35. } catch (IOException | TimeoutException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }