消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
消息服务介绍和使用场景
什么是AMQP: 即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制。
什么是JMS:java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
JMS是一种与厂商无关的API。
使用场景:
核心应用:
解耦:订单系统 >> 物流系统
异步:用户注册 >> 发送邮件,初始化信息
削峰:秒杀、日志处理
跨平台、多语言
分布式事务、最终一致性
RPC调用上下游对接、数据源变动》通知下属
主流消息中间件框架对比
- Apache ActiveMQ
Apache出品,历史悠久,支持多种语言的客户端和协议Java,NET,c++等,基于JMS Provider实现。吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用。 - Kafka
是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafaka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户行动),副本集机制,实现数据冗余,保持数据尽量不丢失,支持多个生产者和消费者。不支持批量和广播消息,运维难度大。 - RabbitMQ
是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。。。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方便表现不错。使用Erlang开发,阅读和修改源码难度大 RocketMQ
阿里开源的消息中间件,Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,性能强劲(零拷贝技术),支持海量堆积,支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤,延迟消息等,在阿里内部大规模使用,适合在电商、互联网金融等领域使用。因为是阿里内部从实践到产品的产物,因此里面很多接口、API并不是很普遍适用。
RocketMQ介绍
是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
-
RocketMQ安装与启动
服务器安装下载: https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip解压:unzip rocketmq-all-4.7.1-bin-release.zip根据机器内存情况设置运行内存:
- 修改runserver.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m” - 修改runbroker.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” - 修改tools.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m”
运行 name server
nohup sh bin/mqnamesrv &
查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
运行broker
nohup sh bin/mqbroker -n localhost:9876 &
nohup sh bin/mqbroker -c 配置文件 &
查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
关闭服务
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
要想完全清空数据,删除文件夹~/store,然后重启
- 修改runserver.sh
- 控制台安装
下载: git clone https://github.com/apache/rocketmq-externals.git
找到rocketmq-console/src/main/resources/application.properties 根据需求,修改配置
server.port=8081
name server地址
修改 pom.xml ,修改RocketMQ相关依赖的版本
4.7.1
切换到控制台目录 cd rocketmq-console
mvn clean package -DskipTests