Artemis采取了不同的方法。它只在内部实现队列,所有消息传送概念都是通过使用消息地址路由到适当的队列来实现的。如:发布订阅(topics)和点对点(queues)之类的消息传递概念是使用不同类型的路由机制实现。广播(Multicast)路由用于实现发布订阅(publish-subscribe)规程,所有订阅者将通过路由将获得自己的内部队列和消息。选播(Anycast)路由用于实现点对点规程,其中每一个地址只有一个队列,所有消费者都订阅它。所有协议都使用寻址和路由方案。例如,您可以将JMS topic作为广播地址,用于发送给消费者。
消息队列
点对点消息(queue):
通常将消息发送到队列,队列将消息持久化后,将消息推送给多个消费者,但只有其中一个能够接收成功。如果生产者在生产消息的时候,并无消费者在线,那么会将消息进行持久化,待消费者上线后,将消息发送给消费者。当消费者接收消息成功后,会隐式地发送一个ack给消息中间件,消息中间件收到ack后将消息从消息队列中标记为已消费并从队列中移除。
发布订阅(topic):
将消息发送到称为Topic的实体上,一个Topic可以有许多消费者(订阅者),每个订阅者都能收到发布者(生产者)的消息。生产订阅消息分为持久性订阅和非持久性订阅。
持久性订阅会将消息进行持久化而保存每一条消息,直到所有订阅者都收到了它们。
非持久订阅仅持续到创建它们的连接的最长生命周期。
消息传递可靠性,消息的可靠性有三种传输保障:
- At most once,至多一次,消息可能丢失但是不会重复发送;
- At least once,至少一次,消息不会丢失,但是可能会重复;
- Exactly once,精确一次,每条消息肯定会被传输一次且仅一次。
事务消息:
事务消息指的是针对事务开始和事务结束期间的全体操作,要不全部成功,要不全部失败。Artemis支持本地的多个消息的发送和确认事务。
消息持久化:
需要持久化的消息存储在磁盘中,不需要持久化的消息存储在内存中。
使用Artemis
首先我们我们下载并安装了broker,我们就会遇到与ActiveMQ第一个不同之处。使用Artemis,您需要显式地创建一个broker实例,而在ActiveMQ上,这个步骤是可选的。这个步骤的意思是将broker的安装和配置分开,这使得以后升级和维护代理更加容易。
所以,在启动Artemis之前,你需要执行这样的操作:
$ bin/artemis create clown /data/clown
不管你在什么操作系统上安装broker二进制文件,broker实例都会位于像“/opt/artemis”目录中。对每个ActiveMQ用户都熟悉这个文件夹的内容:
- bin – 包含管理broker的shell脚本(start, stop, etc.)
- data – 每一个broker数据存储 (message store)
- etc – 包含broker配置文件(it’s what conf directory is in ActiveMQ)
- log – Artemis logs存储目录, 而不像ActiveMQ将日志保存在data目录中
- tmp – 临时文件目录
现在让我们更详细地了解一下配置。
- etc/bootstrap.xml
- 文件用于设置基本信息,如:主broker配置文件的位置、实用程序(如Web服务器)和JAAS安全性。
- etc/broker.xml
- 与ActiveMQ’s中的conf/activemq.xml类似,用于配置broker的大部分信息,如:端口、名称、安全策略等。
- etc/artemis.profile
- 文件与ActiveMQ中的bin/env文件类似。您可以在这个文件中配置broker环境变量,主要:SSL、调试等相关的JVM常规参数。
- etc/logging.properties
- 两个brokers之间的日志配置没有多大差别,所以任何熟悉Java日志记录系统的人都会配置这个文件。
最后,我们有JAAS配置文件 (login.config, artemis-users.properties and artemis-roles.properties),它们与ActiveMQ中的配置相同。
现在我们可以启动Artemis:
$ bin/artemis run
Spring配置Artemis
maven配置
<!-- https://mvnrepository.com/artifact/org.apache.activemq/artemis-jms-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.7.0</version>
</dependency>
Java配置
private ActiveMQConnectionFactory factory;
private Connection connection;
@Autowired
private ActiveMQQueueListener listener;
public ActiveMQConfig() {
factory = new ActiveMQConnectionFactory(tcp_uri);
factory.setCacheDestinations(true);
try {
connection = factory.createConnection();
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
首先我们需要一个ActiveMQ的连接器工厂类,然后还需要连接到ActiveMQ。
@Bean(destroyMethod = "close")
public Session activeMQSession() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session;
}
/**
* Queue生产者
*
* @param session
* @return
* @throws JMSException
*/
@Bean
public MessageProducer messageProducer(Session session) throws JMSException {
if (mqType == 0) {
return session.createProducer(session.createQueue(QUEUE_DEFAULT));
}
return session.createProducer(session.createTopic(TOPIC_DEFAULT));
}
这里我们需要创建一个生产者(MessageProducer),mqType是一个int对象,如:private int mqType = 1;//0:queue|1:topic|2:mqtt
这里为了做实验,便于切换,但是在实际应用场合中,我们应该是既使用Queue也会使用Topic,我将会在最后贴出ActiveMQService服务类,此类实现了对Artemis的基本操作。
/**
* Queue消费者
*
* @param session
* @return
* @throws JMSException
*/
@Bean
public MessageConsumer messageConsumer(Session session) throws JMSException {
MessageConsumer consumer = null;
if (mqType == 0) {
consumer = session.createConsumer(session.createQueue(QUEUE_DEFAULT));
} else {
consumer = session.createConsumer(session.createTopic(TOPIC_DEFAULT));
}
consumer.setMessageListener(listener);
return consumer;
}
最后我们只需要创建一个消费者(MessageConsumer),配置文件非常简单,现在可以用来测试。
@RestController
@RequestMapping(value = "/test")
public class TestActiveMQController {
@Autowired
private IActiveMQProducerService producerService;
@RequestMapping(value = "/activemq/queue")
public String testQueue(HttpServletRequest request) {
String str = "test queue " + DateTimeUtils.getTimeInt();
producerService.send(str);
return str;
}
@RequestMapping(value = "/activemq/mqtt")
public String testMqtt(HttpServletRequest request) {
String str = "test mqtt " + DateTimeUtils.getTimeInt();
producerService.push(str);
return str;
}
}
//生产者服务类
@Service
public class ActiveMQProducerServiceImpl extends CompactService implements IActiveMQProducerService {
@Autowired
private MessageProducer producer;
@Autowired
private Session session;
@Autowired
private BlockingConnection blockingConnection;
public ActiveMQProducerServiceImpl() {
}
/**
* Queue & Topic send
*
* @param message
*/
@Override
public void send(String message) {
try {
producer.send(session.createTextMessage(message));
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* MQTT push
*
* @param message
*/
@Override
public void push(String message) {
try {
blockingConnection.publish(ActiveMQConfig.MQTT_DEFAULT, message.getBytes(), QoS.AT_LEAST_ONCE, false);
} catch (Exception e) {
e.printStackTrace();
}
}
}
为了测试,随便整合了一下,但是后来觉得,其实没有必要使用config的方式进行整合,以为Apache ActiveMQ Artemis是一个非常大的消息队列,应该以服务层的概念来整合进去,所以后来写了一个Artemis服务对象类:
整合到服务层
接口IActiveMQService
import javax.jms.*;
public interface IActiveMQService {
Session createSession() throws JMSException;
Queue createQueue(Session session, String name) throws JMSException;
Topic createTopic(Session session, String name) throws JMSException;
MessageProducer getMessageProducer(Queue queue) throws JMSException;
MessageProducer getMessageProducer(Topic topic) throws JMSException;
MessageProducer createMessageProducer(Destination destination, String name) throws JMSException;
MessageConsumer getMessageConsumer(Session session, Queue queue) throws JMSException;
MessageConsumer getMessageConsumer(Session session, Topic topic) throws JMSException;
MessageConsumer createMessageConsumer(Session session, Destination destination, String name) throws JMSException;
boolean send(String name, String type, String message);
boolean send(String name, String type, String message, CompletionListener listener);
boolean send(MessageProducer producer, String message);
boolean send(MessageProducer producer, String message, CompletionListener listener);
void addListener(String name, String type, MessageListener listener);
}
实现ActiveMQService:
import com.lanxinbase.system.basic.CompactService;
import com.lanxinbase.system.service.resource.IActiveMQService;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import javax.jms.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class ActiveMQService extends CompactService implements IActiveMQService, InitializingBean, DisposableBean {
public static final String TYPE_QUEUE = "queue";
public static final String TYPE_TOPIC = "topic";
private static final String TOPIC_DEFAULT = "topic/default/";
private static final String QUEUE_DEFAULT = "queue/default/";
private static final String tcp_uri = "tcp://0.0.0.0:61616";
private ActiveMQConnectionFactory factory;
private Connection connection;
private Session session;
private Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>();
public ActiveMQService() {
}
/**
* 首次启动运行
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
factory = new ActiveMQConnectionFactory(tcp_uri);
factory.setCacheDestinations(true);
connection = factory.createConnection();
connection.start();
createSession();
}
/**
* 私有的session,主要用于生产者
*
* @return
* @throws JMSException
*/
private Session getSession() throws JMSException {
if (session == null) {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
return session;
}
/**
* 创建一个新的session,主要应用于消费者
*
* @return
* @throws JMSException
*/
@Override
public Session createSession() throws JMSException {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
/**
* 创建一个Queue
*
* @param s session
* @param name 如:test
* @return
* @throws JMSException
*/
@Override
public Queue createQueue(Session s, String name) throws JMSException {
if (name == null) {
name = "";
}
return s.createQueue(QUEUE_DEFAULT + name);
}
/**
* 创建一个Topic
*
* @param s session
* @param name 如:test
* @return
* @throws JMSException
*/
@Override
public Topic createTopic(Session s, String name) throws JMSException {
if (name == null) {
name = "";
}
return s.createTopic(TOPIC_DEFAULT + name);
}
/**
* 获取一个生产者
*
* @param queue
* @return
* @throws JMSException
*/
@Override
public MessageProducer getMessageProducer(Queue queue) throws JMSException {
return createMessageProducer(queue, queue.getQueueName());
}
/**
* 获取一个生产者
*
* @param topic
* @return
* @throws JMSException
*/
@Override
public MessageProducer getMessageProducer(Topic topic) throws JMSException {
return createMessageProducer(topic, topic.getTopicName());
}
/**
* 创建一个生产者
*
* @param destination
* @param name 缓存key,同时等于queue&topic的name
* @return
* @throws JMSException
*/
@Override
public MessageProducer createMessageProducer(Destination destination, String name) throws JMSException {
logger(name);
MessageProducer producer = producerMap.get(name);
if (producer == null) {
producer = session.createProducer(destination);
producerMap.put(name, producer);
}
return producer;
}
/**
* 获取一个消费者
*
* @param s 可以通过createSession创建
* @param queue
* @return
* @throws JMSException
*/
@Override
public MessageConsumer getMessageConsumer(Session s, Queue queue) throws JMSException {
return createMessageConsumer(s, queue, queue.getQueueName());
}
/**
* 获取一个消费者
*
* @param s 可以通过createSession创建
* @param topic
* @return
* @throws JMSException
*/
@Override
public MessageConsumer getMessageConsumer(Session s, Topic topic) throws JMSException {
return createMessageConsumer(s, topic, topic.getTopicName());
}
/**
* 创建一个消费者
*
* @param s session(本来是把session做成单例,但是消费者应该是动态的,不同于生产者,所以这里需要随时创建一个session)
* @param destination
* @param name 废弃
* @return
* @throws JMSException
*/
@Override
public MessageConsumer createMessageConsumer(Session s, Destination destination, String name) throws JMSException {
// MessageConsumer consumer = consumerMap.get(name);
MessageConsumer consumer = null;
// if (consumer == null) {
// consumer = session.createConsumer(destination);
// consumerMap.put(name, consumer);
// }
consumer = s.createConsumer(destination);
return consumer;
}
/**
* 生产一条消息
*
* @param name Queue|Topic的name
* @param type 类型:Queue|Topic
* @param message 字符串消息,通常应该是JSON字符串
* @return
*/
@Override
public boolean send(String name, String type, String message) {
return this.send(name, type, message, null);
}
@Override
public boolean send(String name, String type, String message, CompletionListener listener) {
MessageProducer producer;
try {
if (TYPE_QUEUE.equals(type)) {
producer = getMessageProducer(createQueue(getSession(), name));
} else {
producer = getMessageProducer(createTopic(getSession(), name));
}
if (listener == null) {
return this.send(producer, message);
} else {
return this.send(producer, message, listener);
}
} catch (JMSException e) {
e.printStackTrace();
}
return false;
}
/**
* 发送消息
*
* @param producer 生产者
* @param message 消息字符串
* @return
*/
@Override
public boolean send(MessageProducer producer, String message) {
try {
producer.send(session.createTextMessage(message));
return true;
} catch (JMSException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean send(MessageProducer producer, String message, CompletionListener listener) {
try {
producer.send(session.createTextMessage(message), listener);
return true;
} catch (JMSException e) {
e.printStackTrace();
}
return false;
}
/**
* 消费者监听
*
* @param name Queue|Topic的name
* @param type 类型:Queue|Topic
* @param listener 监听回调
*/
@Override
public void addListener(String name, String type, MessageListener listener) {
MessageConsumer consumer = null;
try {
Session session = createSession();
if (TYPE_QUEUE.equals(type)) {
consumer = getMessageConsumer(session, createQueue(session, name));
} else {
consumer = getMessageConsumer(session, createTopic(session, name));
}
this.addListener(consumer, listener);
} catch (JMSException e) {
e.printStackTrace();
}
}
private void addListener(MessageConsumer consumer, MessageListener listener) {
try {
consumer.setMessageListener(listener);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 程序退出时需要关闭或停止连接
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
if (session != null) {
session.close();
}
connection.stop();
connection.close();
factory.close();
}
}
由于每个方法都加了注释,所以这里就不更多的废话了。但是在public MessageConsumer createMessageConsumer()方法中我使用的是创建了一个全新的Session来创建消费者,为什么要这样子做呢?
在生产环境中应该跟createMessageProducer()方法中的session一样使用单列的session,这里需要注意就是这个session我并没有研究过有没有过期时间,在生产环境中,需要实时传输消息,理论上是不会过期。
之所以这样做,我认为在用户登陆APP上线之后会执行一个会话(创建一个新的session),当APP下线之后也应该移除这个Session,示例代码只是为了做测试,所以没有过多的实现,在实际应用中应该根据自身的业务需求来改进逻辑代码。
最后测试TestActiveMQController:
@RestController
@RequestMapping(value = "/test")
public class TestActiveMQController {
private static final Logger logger = Logger.getLogger("TestActiveMQController>");
@Resource
private IActiveMQService activeMQService;
/**
* /test/activemq/pub/add?name=testQueue&type=queue
* /test/activemq/pub/add?name=testQueue1&type=queue
* <p>
* /test/activemq/pub/add?name=testTopic&type=topic
* /test/activemq/pub/add?name=testTopic1&type=topic
*
* @param request
* @return
*/
@RequestMapping(value = "/activemq/pub/add")
public ResultResp<Void> pubAdd(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String name = request.getParameter("name");
String type = request.getParameter("type");
String msg = "test mqtt " + DateTimeUtils.getTimeInt();
activeMQService.send(name, type, msg);
resp.setInfo(msg);
return resp;
}
/**
* /test/activemq/sub/add?id=100&name=testQueue&type=queue
* /test/activemq/sub/add?id=101&name=testQueue1&type=queue
* <p>
* /test/activemq/sub/add?id=100&name=testTopic&type=topic
* /test/activemq/sub/add?id=101&name=testTopic&type=topic
*
* @param request
* @return
*/
@RequestMapping(value = "/activemq/sub/add")
public ResultResp<Void> subAdd(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String id = request.getParameter("id");
String name = request.getParameter("name");
String type = request.getParameter("type");
activeMQService.addListener(name, type, (m) -> {
TextMessage textMessage = (TextMessage) m;
try {
logger.info(id + " : " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
return resp;
}
}
SpringBoot配置Artemis
POM文件
// 引入activemq
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
// 引入artemis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
配置文件
配置activemq
spring:
activemq:
broker-url: tcp://${ACTIVEMQ_HOST, localhost}:61616 # activemq连接地址
user: ${ACTIVEMQ_USER, admin} # 用户名
password: ${ACTIVEMQ_PASSWORD, admin} # 密码
send-timeout: # 发送超时时间
pool:
enabled: false # 是否创建 JmsPoolConnectionFactory 连接池
idle-timeout: 30s # 空闲连接超时时间
max-connections: 50 # 连接池中最大连接数
max-sessions-per-connection: 100 # 每个连接最大会话
配置artemis
spring:
artemis:
mode: native
host: ${ARTEMIS_HOST, localhost} # artermis连接地址
port: ${ARTEMIS_PORT, 9876} # artermis连接端口
user: ${ARTEMIS_USER, admin} # 用户名
password: ${ARTEMIS_PASSWORD, admin} # 密码
send-timeout: # 发送超时时间
pool:
enabled: false # 是否创建 JmsPoolConnectionFactory 连接池
idle-timeout: 30s # 空闲连接超时时间
max-connections: 50 # 连接池中最大连接数
max-sessions-per-connection: 100 # 每个连接最大会话
Spring JMS 中默使用 CachingConnectionFactory 创建连接池,如果指定 JmsPoolConnectionFactory 连接池,测在 spring.jms.* 中配置连接池属性。
spring:
jms:
session-cache-size: 5
# 是否允许发布订阅
pub-sub-domain: true
关于 Activemq (Artemis) 和 Apring Jms 更多配置,可参考:Spring Boot Integration Properties
使用
JmsTemplate 是 Spring JMS 中用来发送消息的类。做完以上配置,在 Spring Boot 启动入口加上 @EnableJms 注解,项目在启动时会自动装配,在项目中直接注入 JmsTemplate 对象。
@SpringBootApplication
@EnableJms
public class Application {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
发送文本消息示例:
public void send(String msg) {
log.info("发送消息:{}", msg);
jmsTemplate.send(DEST_NAME, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
// 也可以创建对象 session.createObjectMessage()
TextMessage textMessage = session.createTextMessage();
textMessage.setText(msg);
return textMessage;
}
});
}
接收使用 @JmsListener 注解:
/**
* 监听消息
* @param content
*/
@JmsListener(destination = DEST_NAME, concurrency = "消费线程数")
public void recive(String content) {
log.info("收到消息:{}", content);
}
发送消息和接收消息的 DEST_NAME 要保持一至。 concurrency 属性非必须,用来设置消费消息的程序数。关于 ActiveMQ 的详细介绍会在专门的章节讲解。
Producer
@Component
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void send(Destination destination, final String message) {
jmsMessagingTemplate.convertAndSend(destination, message + "from queue");
}
@JmsListener(destination="out.queue")
public void consumerMessage(String text){
System.out.println("从out.queue队列收到的回复信息为:"+text);
}
}
Consumer
@Component
public class Consumer {
@JmsListener(destination = "mytest.queue")
@SendTo("out.queue")
public String receiveQueue(String text) {
System.out.println("Consumer收到的信息为:"+text);
return "return message "+text;
}
}
Rest使用
@ResponseBody
@RequestMapping(value = "/mqtest", method = RequestMethod.GET)
public Object mqtest() {
Destination destination = new ActiveMQQueue("mytest.queue");
producer.send(destination, "I am YeJiaWei");
return "OK";
}