消息中间件的概念以及和RPC的区别
概念:利用高效可靠的消息传递机制进行异步的数据传输,并基于数据通信进行分布式系统的集成。通过提供消息队列模型和消息传递机制,可以在分布式环境下扩展进程间的通信。
消息队列与RPC:
消息队列 | RPC | |
---|---|---|
模式 | 生产者消费者模式 | 请求响应模式 |
面向对象 | 面向数据 | 面向动作 |
使用场景 | 异步 | 一般是远程同步调用 |
有无存储 | 有broker来存储消息 | 没有存储,只有通信 |
级别 | 系统级、模块级的通信 | 对象级、函数级的通信 |
使用场景
- 解耦 : 一个业务的非核心流程需要依赖其他系统,但结果并不重要,有通知即可。
- 最终一致性 : 指的是两个系统的状态保持一致,可以有一定的延迟,只要最终达到一致性即可。经常用在解决分布式事务上。
- 广播 : 消息队列最基本的功能。生产者只负责生产消息,订阅者接收消息。
- 错峰和流控
各个消息中间件的对比
|
| Kafka | RocketMQ | RabbitMQ | | —- | —- | —- | —- | | 设计定位 | 用于日志收集和传输 | 非日志可靠消息传输。例如:订单、交易、充值、流计算、消息推送、流式处理、binglog分发等 | 可靠消息传输,类似RocketMQ | | 客户端SDK | Java, Scala, Go etc | Java, C++, Go | Java, .NET, C++ etc | | 持久化方式 | 磁盘文件 | 磁盘文件 | 内存、文件 | | 集群管理 | zk | name server | - | | 消息写入 性能 | 非常好,每条10KB测试,百万TPS | 很好,每条10KB测试,单机单broker7wTPS,单机3broker12wTPS | RAM为RocketMQ的1/2,DISK为RAM的1/3 | | 性能稳定性 | 队列/分区多时性能不稳定,明显下降,消息堆积时性能稳定 | 队列较多,消息堆积时性能稳定 | 消息堆积时,性能不稳定,明显下降 | | 事务消息 | 不支持 | 支持 | 不支持 | | 评价 | 主要用于日志收集和传输,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务 | 具有高吞吐量、高可用性、适合大规模分布式系统应用,适合需要事务支持的交易系统 | 在高吞吐量、高可用上不如Kafka,RocketMQ,但是由于erlang语言特性,使用RAM模式时,性能也比较好,适合中等规模的数据场景 |
ActiveMQ
- ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线,并且是一个完全支持JMS(Java Message Service)规范的消息中间件。
- 其丰富的API、多种集群构建模式使得他称为业界老牌消息中间件,在中小型企业中应用广泛!
-
JMS规范
Java消息服务(Java Message Server),是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。
相关概念: 消息中间件(JMS Provider) : 指提供了对JMS协议的第三方组件,比如ActiveMQ就是一个消息中间件;
- 消息模式:分为点对点(Point to Point,即P2P)和发布/订阅(Pub/Sub),对应的数据结构分别是队列(Queue)和主题(Topic);
- 消息(Message):通信内容的载体,其结构主要分为消息头,属性和消息体,并且根据存储结构的不同分为好几种;
- 消息生产者:产生消息的一方,在P2P模式下,指消息发送者(Sender),在P/S模式下指消息发布者(Publisher);
- 消息消费者:接收消息的一方,对应于两种模式分别是消息接收者(Receiver)和消息订阅者(Subscriber) 。
使用方法:
public void testMQProducerQueue() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(queue);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
public void TestMQConsumerQueue() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程序等待接收用户消息
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求在其次。
AMQP规范
AMQP全称:Advanced Message Queuing Protocol.
AMQP翻译:高级消息队列协议。
AMQP定义:二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
核心概念:
- Server:又称Broker,接受客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Broker的网络连接。
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。
- Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。
- Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。
- Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。
- Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。
Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
消息可靠性的保障
发送方确认模式:
- 将信道(传输途径)设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID,这个会被用于去重。
- 一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。
接收方确认机制:
- 消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
- 可以将消息持久化磁盘,防止丢失。
消息分发的方式
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。消息路由的方式
消息提供方->路由->一至多个队列。
消息发布到交换器时,消息将拥有一个路由键(routing key)。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。
常用的交换器:
fanout:不需要路由键,如果交换器收到消息,将会广播到所有绑定的队列上,速度最快。
- direct:如果路由键完全匹配,消息就被投递到相应的队列。
topic:可以用通配符进行模糊匹配,使来自不同源头的消息能够到达同一个队列。
Broker和Cluster
broker 是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。cluster 是在 broker 的基础之上,增加了 node 之间共享元数据的约束。
死信队列
死信队列的来源:
消息被拒绝(basic.reject/basic.nack)并且requeue=false。
- 消息TTL过期。
- 队列达到最大长度。
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。
使用方法:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("x.x.x.x");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//通过connection创建一个Channel
Channel channel = connection.createChannel();
//发送数据
String msg = "Hello!!!";
//The default exchange is implicitly bound to every queue, with a routing key equal to the queue name.
//It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
channel.basicPublish("", "test001", null, msg.getBytes());
System.out.println("Sent '"+msg+ "'");
//关闭连接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("x.x.x.x");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//通过connection创建一个Channel
Channel channel = connection.createChannel();
//创建队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//以对象的形式提供一个回调,它将缓冲消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
RocketMQ
RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,它对消息的可靠性传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
高可用的保障
- 多Master部署,防止单点故障。
-
消息的可靠性
如果消息无法使用,将重传(最多重传16次)。
-
NameServer与Broker
Name Server是一个几乎无状态(没有leader和follower)的节点,可集群部署,节点之间无任何信息同步,主要功能是为整个MQ集群提供服务协调与治理,具体就是记录维护Topic、Broker的信息,及监控Broker的运行状态。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
分布式事务的支持
A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),B和MQ保证事务一致(通过重试),从而达到最终事务一致性。
大事务=小事务+异步:
顺序性的保障
顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消息包含两种类型:
- 分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费。
- 全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费。
具体思路:
生产端将同一订单的消息路由到特定的分区。PullMessageService获取到消息后添加到ProcessQueue中,单线程执行,所以ProcessQueue中的消息是顺序的。消费者从ProcessQueue拿取消息,也是顺序的。
使用方式:
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Kafka
Kafka是Linkedln开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
概念
- Broker kafka集群由一个或多个kafka server组成,每个server即Broker。Broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。
- Topic逻辑概念。kafka对消息保存时根据Topic进行归类,一个Topic可以认为是一类消息。
- Partition物理概念。每个topic将被分成一到多个partition(分区),每个partition在存储层面就是一个append log文件。一个非常大的topic可以分成多个partition,分布到多个broker上。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
- offset任何发布到Partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因此在kafka中几乎不允许对消息进行“随机读写”。
- Consumer Group每个 consumer 属于一个特定的 consumer group(若不指定 group name 则属于默认的 group)。如果所有的consumer都具有相同的group, 即单播,消息将会在consumers之间负载均衡;如果所有的consumer都具有不同的group,那这就是”发布-订阅”,每条消息将会广播给所有的consumer。
高可用
Kafka确保高可用的策略有Data Replication和Leader Election。Leader Election
Kafka的选举模式并不是少数服从多数,而是在zookeeper中动态维护了一个ISR(In-Sync Replicas),ISR中所有Replica都跟上了Leader,只有ISR里的成员才有被选为Leader的可能
选Leader的方案是:所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。Data Replication
Producer发布消息时,先找到Partition的Leader Replica,并发送消息;Leader写入Log,其他的Follower会pull数据,保证数据的同步性,然后向Leader发送ack确认(实际上,每个Follower在接受到数据就会发送ack,以提高性能),一旦Leader接收到了所有的ISR中的Follower的ack消息,就认为已经commit了。存活确认
kafka的存活条件包括两个条件:
- kafka必须维持着与zookeeper的session(这个通过zookeeper的heartbeat机制来实现)
- follower必须能够及时的将数据从leader复制过去 ,不能够“落后太多”。
kafka 如何保证消费者不会重复消费数据
kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。
但是当我们直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset。等重启之后,少数消息就会再次消费一次。
消息确认机制
消息写入到所有副本的日志中才算提交,才可以被消费者消费。这是对消费者来说的,
生产者是否要等待消息都被写入所有副本之后才收到返回是另外一回事,这个可以通过acks来配置,kafka为生产者提供3种消息确认机制(request.required.acks参数):
- acks=0,生产者无需等待代理返回确认,就是可以连续发送,但是无法保证消息是否被代理收到。
- acks=1,生产者需要等待Leader副本成功写入日志。这种方式降级了消息丢失的可能性,但是也只是Leader写入日志而不管Follower是否写入。
acks=-1,Leader副本和所有Follower都写入日志才会向生产者发送确认信息。
Kafka为什么快
写入
Kafka写入性能快主要有两个原因: (1) 磁盘顺序写; (2)MMFile(1)磁盘顺序写: 收到数据之后直接顺序写入到Topic的每个partition对应的文件尾部,避免了文件的随即写,性能是随机写的1000倍;
- (2)MMFile: Memory Mapped Files(内存映射文件),MMFile的优点就是性能比普遍的文件读写快很多。原因如下:传统的文件读写都需要先把文件从硬盘copy到内核空间,然后再从内核空间copy到用户空间;而MMFile是直接把硬盘文件copy到内核空间,用户程序直接访问内核空间,所以会少一次copy的操作。但是MMFile的缺点是虽然内容已经写入到Page cache,但是并没有刷到磁盘上。所以要保证高可用的话,必须把数据刷到磁盘上才不会丢失。Kafka提供了同步刷盘和异步刷盘的配置。
消费
使用零拷贝,减少2次的用户空间和内核空间之间的copy操作。
怎么保证消息的顺序性
众所周知,一个topic可设置多个partition,partition分布在多个server,每个partition有一个leader,多个follower。以此实现写入高并发。消息可能被写入任一个partition,所以不能保证多个partition之间消息的存入顺序;更不能保证消息的消费顺序。那么kakfa是怎么保证一个partition顺序的?
- 一个topic,只设置一个partition。producer发送消息到kafka,kafka给每个生产者一个生产者PID,kafka单个partition保存的消息顺序格式即<
,sequnceId> 同一时间先后发送两条消息到kafka的同一partition,假如第一条message1发送失败,第二条message2发送成功并保存到kafka;紧接着第一条会重试并发送成功,如果这时partition保存,会造成消息顺序混乱。实际partition不会,判断的机制是message1=squenceId1,如果message1发送失败,第二条消息message2=squenceId2过来,判断当前sequence和sequence2序号差大于1(因为sequence1未保存),会丢弃这条消息;如果squence2保存了,sequence1又重发了一条,判断当前sequence1大于squence2,说明是重试的消息,也会丢弃。 - 发送消息可指定(topic, partition, key) 3个参数,即保证同一类消息只发送到同一个partition,保证写入顺序,同时也保证了消费顺序。
- 生产者客户端开启幂等性配置,broker会自动保证单分区消息不重复。
Nsq
Nsq是用go写的开源消息队列,有赞的Nsq基于开源框架做了自己的定制化。Nsq基本概念
NSQ由三个基本组件组成:
- nsqadmin:管理工具
- nsqd:这是真正的队列所在的进程,如果想要使用nsqlookupd的话,需要在启动的时候传入参数
- nsqlookupd:通过nsqlookupd可以注册和访问多个nsqd
nsqd可以在多个机器上部署,consumer通过nsqlookupd来连接到多个nsqd进行消费, nsqadmin连上nsqlookupd进行管理。
原生Nsq的缺陷
- 生产者不能动态发现nsq
- 数据缺少备份
- 无法实现顺序消费
1、生产者不能动态发现nsq
nsq官方推荐的集群策略,要求每个生产者都配置一个nsq,这样有两个问题:
- 每增加一个生产者,就要增加一个nsq,有点浪费
- 如果生产者配套的nsq挂了,这个生产者就不能发布消息
解决方案:让生产者像消费者那样,通过nsqlookup来动态查找nsq的消息。
2、数据缺少备份
nsq选择把消息放到内存中,只有当队列里消息的数量超过—mem-queue-size配置的限制时,才会对消息进行持久化,把消息保存到磁盘中,简称”刷盘“。
但是即使将—mem-queue-size设置为0,即每条消息都会保存到磁盘(当然这样会很影响效率),也不能保证数据的安全,一旦nsq节点磁盘受损,数据还是会丢失。
我们需要对数据进行复制,才能实现消息的真正可靠性。
有赞借鉴了Kafka的partition机制,把消息复制到多个nsq的partition中,比如topic A的消息,配置了两个partition,一个在nsq A,另一个在nsq C,那么一旦有新的topic A的消息被生产,消息就会被复制到这两个nsq中,原理和Kafka的一致,消息先被发布到leader partition,leader partition再把消息复制到其他partition。
同时,有赞也对进来nsq的消息,直接进行刷盘,不再等队列里消息的数量超过—mem-queue-size配置的限制时,才会对消息进行持久化,channel主动过来磁盘读取消息,下面是改造前和改造后,消息读取方式的对比:
3、无法实现顺序消费
有赞nsq实现顺序消费,同样借鉴了Kafka的实现思路:
- 相同ID的,只会去到相同的partition
- 调整并发消费策略, 保证同一时刻只会投递一条消息进行消费, 等待上一次ack成功后才继续投递下一条消息
虽然都是在做MQ,但是Nsq和Kafka在各自的领域设计上,有相同,也有不同,比如对于Consumer这个Domain对象,Kafka就有pull这个行为,但是Nsq没有,而对于MQ Server这个Domain,Nsq就有push行为,而Kafka没有,但是殊途同归,大家最后都成为了很牛逼的消息中间件。