ActiveMQ遵循jms规范,但是只有简单的点对点模式和发布订阅模式,并发高并发性能也不好;
RabbitMQ是使用天生为高并发而生的Erlang语言编写的,支持高并发,支持多种路由策略,fanout给所有与交换机绑定的消费者发消息,derect根据routingkey来给消费队列发送消息,topic在路由策略中增加了*和#的表达式,以及发送消息时候有支持事务,在点对点钟还支持公平类型的,根据服务器的性能来分配需要消费的信息;confim机制保证发送消息的可靠性,手动应答机制保证消费者的安全性,以及死性队列的应用可以对一些异常情况做一些同一处理,比如定时job检查。同时RabbitMQ可以处理分布式事务;
MQ:消息中间件
功能:
解耦:
当揽收系统揽收了一个快递,那么会将这个邮件的信息同步到订单组,财务组,投递组,分发系统,中转系统
如果调用接口的话就将两个系统的耦合度提高,如果应用了MQ就会让耦合度降低;
异步:
当我外系统调用我们系统复杂业务接口的时候就会,现将请求信息插入到一张表里,然后将请求表中id放到mq中,写相应消费者接口去处理这些请求,因为这些请求时同步的,如果请求时间较长的话就会请求产生阻塞,不利于客户端体验;
流量消峰:
就像上边提到的情况,如果同一时间来了大量请求,就可以能产生大量请求产生等待的现象,这样很可能让服务挂掉,为保证服务安全,将请求先缓存到mq队列中保护了服务器的安全;实现高性能、高可用;
什么是JMS:
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输;
JMS通讯模式
发布与订阅:一对多,主题topic,如果消费者是集群的话,对于每条消息每个消费者都会进行消费;
消息队列:点对点,队列queue,如果消费者是集群的话,均摊消费;
1、点对点
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
生产者:
/**
* mq通讯地址
*/
private final static String URL = "tcp://localhost:61616";
/**
* 队列名称
*/
private final static String QUEUENAME = "my_queue";
public static void main(String[] args) throws JMSException {
// 1.创建ActiveMQFactory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2.创建连接
Connection cnnection = factory.createConnection();
// 3.启动连接
cnnection.start();
// 4.创建Session 不开启事务,自动签收模式
Session session = cnnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标
Queue queue = session.createQueue(QUEUENAME);
// 6.创建生产者
MessageProducer producer = session.createProducer(queue);
for (int i = 1; i <= 10; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("消息" + i);
// 8.发送消息
producer.send(textMessage);
System.out.println(textMessage.toString());
}
// 9.关闭连接
cnnection.close();
}
消费者
public class Consumer {
/**
* mq通讯地址
*/
private final static String URL = "tcp://localhost:61616";
/**
* 队列名称
*/
private final static String QUEUENAME = "my_queue";
public static void main(String[] args) throws JMSException {
// 1.创建ActiveMQFactory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2.创建连接
Connection cnnection = factory.createConnection();
// 3.启动连接
cnnection.start();
// 4.创建Session 不开启事务,自动签收模式
Session session = cnnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标
Queue queue = session.createQueue(QUEUENAME);
// 6.创建生产者
MessageConsumer createConsumer = session.createConsumer(queue);
createConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者消费消息:" + textMessage.getText());
} catch (Exception e) {
// TODO: handle exception
}
}
});
// 先不要关闭连接
}
}
2、发布与订阅
public class Producter2 {
/**
* mq通讯地址
*/
private final static String URL = "tcp://localhost:61616";
/**
* 主题名称
*/
private final static String TOPICNAME = "my_topic";
public static void main(String[] args) throws JMSException {
// 1.创建ActiveMQFactory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2.创建连接
Connection cnnection = factory.createConnection();
// 3.启动连接
cnnection.start();
// 4.创建Session 不开启事务,自动签收模式
Session session = cnnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标
Topic createTopic = session.createTopic(TOPICNAME);
// 6.创建生产者
MessageProducer producer = session.createProducer(createTopic);
// 设置消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= 10; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("消息" + i);
// 8.发送消息
producer.send(textMessage);
System.out.println(textMessage.toString());
}
// 9.关闭连接
cnnection.close();
}
}
public class Consumer2 {
/**
* mq通讯地址
*/
private final static String URL = "tcp://localhost:61616";
/**
* 主题名称
*/
private final static String TOPICNAME = "my_topic";
public static void main(String[] args) throws JMSException {
// 1.创建ActiveMQFactory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2.创建连接
Connection cnnection = factory.createConnection();
// 3.启动连接
cnnection.start();
// 4.创建Session 不开启事务,自动签收模式
Session session = cnnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标
Topic topic = session.createTopic(TOPICNAME);
// 6.创建生产者
MessageConsumer createConsumer = session.createConsumer(topic);
createConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者消费消息:" + textMessage.getText());
} catch (Exception e) {
// TODO: handle exception
}
}
});
// 先不要关闭连接
}
}