消息

一个数据传输单位,它包含了创建时间、通道/主题信息、输入参数等全部数据

在实际开发中,我们说的消息指的是java通过JMS发送的消息

JMS

Java提供了一套标准JMS(java message service)

引自sun公司JMS规范文档

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。【目前严格遵循JMS标准的消息中间件就只有ActiveMQ】

JMS只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就说JMS只是定义了一组接口而已

体系架构

1.JMS提供者
  连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

2.JMS客户
   生产或消费消息的基于Java的应用程序或对象。

  3.JMS生产者
   创建并发送消息的JMS客户。

  4.JMS消费者
   接收消息的JMS客户。

  5.JMS消息
   可以在JMS客户之间传递的数据

JMS消息结构

JMS消息由三部分组成:

消息头

通常包含一些消息的标准描述信息和用于识别和为消息寻找路由的操作设置

下面是一些标准的消息头描述信息:

JMSDestination 消息的目的地,TOPIC或是QUEUE
JMSDeliveryMode 消息的发送模式:persistentnonpersistent。前者表示消息在被消费之前,如果JMS提供者(如active mq)DOWN了,重新启动后消息仍然存在。后者在这种情况下表示消息会被丢失
JMSTimestamp 消息发送时间,当调用send()方法的时候,JMSTimestamp会被自动设置为当前时间
JMSExpiration 表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。
JMSPriority 消息的优先级。0-4为正常的优先级,5-9为高优先级。
JMSMessageID 一个字符串,用来唯一标示一个消息
JMSReplyTo 有时消息生产者希望消费者回复一个消息,JMSReplyTo为一个Destination,表示需要回复的目的地。当然消费者可以不理会它
JMSCorrelationID 通常用来关联多个Message。例如需要回复一个消息,可以把JMSCorrelationID设置为所收到的消息的JMSMessageID
JMSRedelivered 如果这个值为true,表示消息是被重新发送了。因为有时消费者没有确认他已经收到消息或者JMS提供者不确定消费者是否已经收到
JMSType 表示消息体的结构,和JMS提供者有关

消息属性

包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
相当于补充消息头没有的内容,包括三种类型属性:

  • 应用专有属性—为消息增加应用专有的头字段提供的机制。

  • 标准属性—JMS 定义的一些标准属性,它们相当于可选的头字段。

  • 提供商专有属性—在集成 JMS 客户端和JMS 提供商本地客户端时可能会用到提供商专有的属性。JMS 为这些属性定义了命名规范。

消息体

允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口非常灵活,并提供了许多方式来定制消息的内容。

五种消息主体形式

JMS提供五种消息主体的形式,每种形式通过消息接口定义:

  • StreamMessage

消息体包含java原始值数据流,它是连续地被填充和读取的。

  • MapMessage

消息主体包含键值对集合,其中键为字符串,值为Java原生类型。条目访问可被计算器连续地或者名称随机地访问,它的顺序并不一定。

  • TextMessage

消息主体包含Java String 对象

  • ObjectMessage

消息主体包含一个Serializable 对象,如果需要使用集合对象,确保JDK 1.2或更高。

  • BytesMessage

消息主体包含连续字节流

两种消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Pub/Sub(publish/subscribe)

点对点模型(Point to Point)

点对点模型包括三个角色

  • 消息队列(Queue):存放消息的队列
  • 发送者(Sender):发送消息的一方【对应JMS生产者】
  • 接收者(Receiver):消费消息的一方【对应JMS消费者】

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
image.png
特性:

每个队列只能有一个生产者和消费者**

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

发送者和接收者之间在时间上没有依赖性,当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

接收者在成功接收消息之后需向队列应答成功

应用场景:需要确保每一条消息都被成功消费的场景【如QQ好友聊天,不管对方是否在线,都需要发送到队列等待对方消费】

发布-订阅模型(publish/subscribe)

发布-订阅模型包括如下三个角色:

  • 主题(Topic):即消息
  • 发布者(Publisher):发送消息的一方【对应JMS生产者】
  • 订阅者(Subscriber):消费消息的一方【对应JMS消费者】

多个发布者将消息发送到Topic,系统将这些消息传递(Delivers)给多个订阅者

image.png
特性:
每个消息可以有多个消费者。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,只能消费自它订阅之后发布的消息,为了消费消息,订阅者必须保持运行的状态。

JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。**持久订阅允许消费者消费它在未处于激活状态时发送的消息。

应用场景:如果希望发送的消息可以被一个/多个消费者消费,那么可以采用Pub/Sub模型【如QQ群聊】

两种消费方式

在JMS中,消息的产生和消费都是异步的。对于消费者(或者订阅者)而言,可以通过两种方式去消费:

  • 同步:订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前将一直阻塞

  • 异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法

JMS提供的公共API接口

ConnectionFactory 接口(连接工厂)

创建Connection对象的工厂,根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂,分别有QueueConnectionFactoryTopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

Connection 接口(连接)

表示在客户端和JMS系统之间建立的链接(对TCP/IP socket包装)
  Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

Destination 接口(目标)

目标是一个包装了消息目标标识符的对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。

消息生产者的消息发送目标或者说消息消费者的消息来源

对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);
  对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
  
所以,Destination实际上就是两种类型的对象:**QueueTopic**,可以通过JNDI来查找Destination。

Session 接口(会话)

Session是我们操作消息的接口,表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。

可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。

同样,也分QueueSessionTopicSession

MessageProducer 接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

同样,消息生产者分两种类型:QueueSenderTopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

MessageConsumer 接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

同样,消息消费者分两种类型:QueueReceiverTopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。
  
当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

Message 接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。

JMS简单使用

此处我们使用ActiveMQ来学习(因为它是最严格遵守JMS协议的JMS提供者)【关于ActiveMQ的内容可以查看这篇文章

搭建工程,引入如下两个依赖:

  1. <!-- JMS规范的jar依赖 -->
  2. <dependency>
  3. <groupId>javax.jms</groupId>
  4. <artifactId>javax.jms-api</artifactId>
  5. <version>2.0.1</version>
  6. </dependency>
  7. <!-- activeMQ对jms具体实现的jar依赖,我这里选用了对应版本5.16.0的jar包 -->
  8. <dependency>
  9. <groupId>org.apache.activemq</groupId>
  10. <artifactId>activemq-client</artifactId>
  11. <version>5.16.0</version>
  12. </dependency>

点对点模型实例

(1)编写点对点模型(又称队列模型)生产者

  1. public class ActiveMQSender {
  2. // ActiveMQ连接地址
  3. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  4. // 连接账密
  5. public static final String USERNAME = "admin";
  6. public static final String PASSWORD = "admin";
  7. //相当于一个数据库(其实是一个队列)
  8. public static final String DESTINATION = "myQueue";
  9. public static void main(String[] args) {
  10. // 连接工厂
  11. ConnectionFactory connectionFactory;
  12. // 连接
  13. Connection connection = null;
  14. // 会话,接受或者发送消息的线程
  15. Session session;
  16. // 消息的目的地
  17. Destination destination;
  18. // 消息生产者
  19. MessageProducer messageProducer;
  20. try {
  21. // 实例化工厂
  22. connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
  23. // 获取连接
  24. connection = connectionFactory.createConnection(USERNAME, PASSWORD);
  25. // 启动连接
  26. connection.start();
  27. // 创建session(非事务模式,自动确认消息)
  28. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  29. // 创建一个消息队列,即目的地
  30. destination = session.createQueue(DESTINATION);
  31. // 创建消息生产者
  32. messageProducer = session.createProducer(destination);
  33. do{
  34. System.out.println("请发布内容:");
  35. messageText= scanner.nextLine();
  36. // 发送消息,没有返回值,是非阻塞的
  37. sendMessage(session, messageProducer,messageText);
  38. }while(!messageText.equals("拜拜"));
  39. } catch (JMSException e) {
  40. // TODO Auto-generated catch block
  41. e.printStackTrace();
  42. }
  43. }
  44. /**
  45. * 发送消息
  46. * @param session 会话
  47. * @param messageProducer 生产者对象
  48. * @param messageText 消息内容
  49. * @throws JMSException
  50. */
  51. public static void sendMessage(Session session,MessageProducer messageProducer,String messageText) throws JMSException{
  52. TextMessage message = session.createTextMessage(messageText);
  53. //发出消息
  54. messageProducer.send(message);
  55. }
  56. }

(2)编写点对点模型(又称队列模型)的消费者

  1. public class ActiveMQReceiver {
  2. // ActiveMQ连接地址
  3. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  4. // 连接账密
  5. public static final String USERNAME = "admin";
  6. public static final String PASSWORD = "admin";
  7. //相当于一个数据库(其实是一个队列)
  8. public static final String DESTINATION = "myQueue";
  9. public static void main(String[] args) {
  10. // 连接工厂
  11. ConnectionFactory connectionFactory;
  12. // 连接
  13. Connection connection = null;
  14. // 会话,接受或者发送消息的线程
  15. Session session = null;
  16. // 消息的目的地
  17. Destination destination;
  18. // 消息消费者
  19. MessageConsumer messageConsumer = null;
  20. try {
  21. // 实例化工厂
  22. connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
  23. // 获取连接
  24. connection = connectionFactory.createConnection(USERNAME, PASSWORD);
  25. // 接收消息,需要将连接启动一下,才可以接收到消息
  26. connection.start();
  27. // 创建一个Session 第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认)
  28. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  29. // 有了session之后,就可以创建消息,目的地,生产者和消费者
  30. destination = session.createQueue(DESTINATION);
  31. // 消费者
  32. messageConsumer = session.createConsumer(destination);
  33. // 循环接收消息
  34. while (true){
  35. // 接收消息 有返回值,是阻塞的
  36. Message message = messageConsumer.receive();
  37. // 判断消息类型
  38. if(message instanceof TextMessage){
  39. String text = ((TextMessage) message).getText();
  40. System.out.println("生产者发来消息:"+text);
  41. }
  42. }
  43. } catch (JMSException e) {
  44. // TODO Auto-generated catch block
  45. e.printStackTrace();
  46. }
  47. }
  48. }

(3)运行生产者实例ActiveMQSender,连接MQ并发送消息:
image.png

ActiveMQ管理后台,点对点模型页面出现一个队列,名称就叫myQueue,待消费消息1条,总入队消息1条
image.png
(4)启动消费者实例ActiveMQReceiver,连接MQ并消费消息:
image.png
ActiveMQ管理后台,点对点模型页面,myQueue队列待消费消息0条,总入队消息1条,总出队消息1条,消费者1只
image.png
消息生产消费都正常,测试通过

发布-订阅模型实例

(1)编写发布-订阅模型(又称主题模型)发布者

  1. public class ActiveMQPublisher {
  2. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  3. public static final String USERNAME = "admin";
  4. public static final String PASSWORD = "admin";
  5. //相当于一个数据库,主题名字
  6. public static final String DESTINATION = "myTopic";
  7. public static void main(String[] args) {
  8. // 连接工厂
  9. ConnectionFactory connectionFactory;
  10. // 连接
  11. Connection connection = null;
  12. // 会话,接受或者发送消息的线程
  13. Session session=null;
  14. // 消息的目的地
  15. Destination destination=null;
  16. // 消息生产者
  17. MessageProducer messageProducer=null;
  18. try {
  19. // 实例化工厂
  20. connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
  21. // 获取连接
  22. connection = connectionFactory.createConnection(USERNAME, PASSWORD);
  23. // 启动连接
  24. connection.start();
  25. // 创建session(非事务模式,自动确认消息)
  26. session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  27. // 创建一个消息队列,即目的地
  28. destination = session.createTopic(DESTINATION);
  29. // 创建消息生产者
  30. messageProducer = session.createProducer(destination);
  31. // 持续输入消息并发布
  32. Scanner scanner = new Scanner(System.in);
  33. String messageText = "";
  34. do{
  35. System.out.println("请发布内容:");
  36. messageText= scanner.nextLine();
  37. // 发送消息,没有返回值,是非阻塞的
  38. sendMessage(session, messageProducer,messageText);
  39. }while(!messageText.equals("拜拜")); // 如果输入拜拜则直接退出
  40. } catch (JMSException e) {
  41. // TODO Auto-generated catch block
  42. e.printStackTrace();
  43. }finally {
  44. try {
  45. if(messageProducer != null){
  46. messageProducer.close();
  47. }
  48. if(session != null){
  49. session.close();
  50. }
  51. if(connection != null){
  52. connection.close();
  53. }
  54. } catch (JMSException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. /**
  60. * 发送消息
  61. * @param session 会话
  62. * @param messageProducer 生产者对象
  63. * @param messageText 消息内容
  64. * @throws JMSException
  65. */
  66. public static void sendMessage(Session session,MessageProducer messageProducer,String messageText) throws JMSException{
  67. TextMessage message = session.createTextMessage(messageText);
  68. //发出消息
  69. messageProducer.send(message);
  70. System.out.println("生产者发送消息:"+messageText);
  71. }
  72. }

(2)编写发布-订阅模型(又称主题模型)订阅者

  1. public class ActiveMQSubcriber {
  2. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  3. public static final String USERNAME = "admin";
  4. public static final String PASSWORD = "admin";
  5. //相当于一个数据库,主题名字
  6. public static final String DESTINATION = "myTopic";
  7. public static void main(String[] args) {
  8. // 创建多个订阅者订阅同一个主题
  9. for(int i=0;i<5;i++){
  10. new TopicSubcriber(i+"号", BROKER_URL, USERNAME, PASSWORD,DESTINATION).receiveMessage();
  11. }
  12. }
  13. static class TopicSubcriber{
  14. // 连接工厂
  15. private ConnectionFactory connectionFactory;
  16. // 连接
  17. private Connection connection = null;
  18. // 会话,接受或者发送消息的线程
  19. private Session session = null;
  20. // 消息的目的地
  21. private Destination destination;
  22. // 消息消费者
  23. private MessageConsumer messageConsumer = null;
  24. private String brokerUrl,userName,password,destinationName;
  25. // 消费者名称
  26. private String consumerName;
  27. TopicSubcriber(String consumerName,String brokerUrl,String userName,String password,String destinationName){
  28. this.brokerUrl = brokerUrl;
  29. this.userName = userName;
  30. this.password = password;
  31. this.destinationName = destinationName;
  32. this.consumerName = consumerName;
  33. }
  34. public void receiveMessage(){
  35. try {
  36. // 实例化工厂
  37. connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
  38. // 获取连接
  39. connection = connectionFactory.createConnection(this.userName,this.password);
  40. // 接收消息,需要将连接启动一下,才可以接收到消息
  41. connection.start();
  42. // 创建一个Session 第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认)
  43. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  44. // 有了session之后,就可以创建消息,目的地,生产者和消费者
  45. destination = session.createTopic(this.destinationName);
  46. // 消费者
  47. messageConsumer = session.createConsumer(destination);
  48. System.out.println("消费者"+consumerName+"已订阅");
  49. // 设置监听器,持续监听主题是否有新消息发布
  50. messageConsumer.setMessageListener(new MessageListener() {
  51. public void onMessage(Message message) {
  52. // 判断消息类型
  53. if(message instanceof TextMessage){
  54. String text = null;
  55. try {
  56. text = ((TextMessage) message).getText();
  57. System.out.println("我是消费者"+consumerName+",收到生产者发来消息:"+text);
  58. } catch (JMSException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. });
  64. } catch (JMSException e) {
  65. // TODO Auto-generated catch block
  66. e.printStackTrace();
  67. }
  68. }
  69. }
  70. }

(3)先启动订阅者实例ActiveMQSubcriber,连接MQ并订阅主题:
image.png
ActiveMQ管理后台,发布-订阅模型页面出现一个主题,名称就叫myTopic,总入队消息0条,订阅者5个
image.png

(4)运行发布者实例ActiveMQPublisher,连接MQ并发送消息:

此时myTopic主题中多了一个消息,同时被消费了五次
image.png
订阅者全部接收到消息
image.png

JMS事务

之所以放在这里,是因为对于JMS事务,只有实践过一遍才能有所理解。

我们知道,JMS中,Session提供了事务功能,所以JMS的事务实际上就是基于Session去实现的

为什么需要事务?

我们知道,JMS是一个Java平台中关于面向消息中间件(MOM)的API用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

普通消息的处理流程
JMS-消息服务 - 图11
(1)消息生成者发送消息
(2)MQ收到消息,将消息进行持久化,在存储中新增一条记录
(3)返回ACK给生产者
(4)MQ push 消息给对应的消费者,然后等待消费者返回ACK
(5)如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
(6)MQ删除消息

这种方式看似没问题,实际上存在一致性问题(分布式系统的通病),即本地生产者的消息发送与本地业务状态不一致

我们以订单创建为例,订单系统先创建订单(本地事务),再发送消息给下游处理;如果订单创建成功,然而消息没有发送出去,那么下游所有系统都无法感知到这个事件,会出现脏数据;

  1. public void orderProcess(){
  2. // 下订单
  3. orderService.process();
  4. // 发送消息
  5. messageService.sendMessage();
  6. }

如果先发送订单消息,再创建订单;那么就有可能消息发送成功,但是在订单创建的时候却失败了,此时下游系统却认为这个订单已经创建,也会出现脏数据。

  1. public void orderProcess(){
  2. // 发送消息
  3. messageService.sendMessage();
  4. // 下订单
  5. orderService.process();
  6. }

普通情况下可能导致的情况如下

订单创建成功 消息发送成功 一致
订单创建失败 消息发送失败 一致
订单创建成功 消息发送失败 不一致
消息发送成功 订单创建失败 不一致

普通处理方案尝试

那么我们自然而然就会想到将消息发送和业务处理放在同一个本地事务中来进行处理,如果业务消息发送失败,那么本地事务就回滚

  1. public void orderProcess(){
  2. try{
  3. // 下订单
  4. orderService.process();
  5. // 发送消息
  6. messageService.sendMessage();
  7. }catch(Exception e){
  8. 事务回滚;
  9. }
  10. }

这种做法可能的结果如下:

订单创建成功 消息发送成功 MQ确认存储 本地事务提交 一致
订单创建失败
本地事务回滚 一致
订单创建成功 消息发送失败 本地事务回滚 一致
订单创建成功 消息发送成功 MQ存储成功,但是ACK返回超时/失败 本地事务回滚 不一致

也就是说,上述处理方式的问题点实际上出在MQ对消息的处理上,一旦MQ存储消息成功,但是超时或者ACK返回失败,生产者将认为消息发送失败,导致本地事务回滚,导致MQ中存在脏数据。

JMS事务解决方案

既然本地事务无法解决一致性问题,那就试试消息事务,也就是JMS事务。**

JMS 事务遵从发送操作与接收操作相互分离的约定其中一组消息要么能够保证全部到达消息服务器,要么连一条消息也不能保证到达消息服务器。

从发送者的角度来看,JMS 提供者为这组消息提供了高速缓存,直到执行 commit() 为止****如果发生了故障,或者执行了 rollback(),这些消息就会丢弃在一个事务中传送给消息服务器的消息,它并不会转发给消费者,知道该生产者提交该事务为止。(也就是说:在事务未提交之前,消息时不会被持久化存储的,也不会被消费者消费。)

它实现了消息生成者本地事务与消息发送的原子性,保证了消息生成者本地事务处理成功与消息发送成功的最终一致性问题

而且由于JMS 事务遵从发送操作与接收操作相互分离的约定,所以生产者无需理会消息的消费问题,只需要保证生产的消息能够成功存放到MQ即可。

JMS创建事务—-Connection.createSession(boolean var1, int var2)

  1. /**
  2. * 创建会话
  3. * @param var1 是否启用事务
  4. * @param var2 acknowledgment mode(应答模式)
  5. * @throws JMSException
  6. */
  7. Session createSession(boolean var1, int var2) throws JMSException;
  8. /**
  9. * 创建会话
  10. * @param var1 acknowledgment mode(应答模式)
  11. * @throws JMSException
  12. */
  13. Session createSession(int var1) throws JMSException;
  14. /**
  15. * 创建会话
  16. * @throws JMSException
  17. */
  18. Session createSession() throws JMSException;

boolean var1**:设置为true时,表示开启事务,此时会忽略第二个参数,默认将acknowledgment mode设置为Session.SESSION_TRANSACTED;

设置为false时表示不开启事务

int var2**:设置acknowledgment mode(应答模式),当var1设置为false时,可以设置为Session.AUTO_ACKNOWLEDGE,Session.CLTENT_ACKNOWLEDGE,Session.DUPS_OK_ACKNOWLEDGE其中一个。

commit和rollback

在一个JMS客户端,可以使用本地事务来组合消息的发送和接收,JMS Session接口提供了commit和rollback方法。

  开启事务之后,JMS Provider会缓存每个生产者当前生产的所有消息,直到commit或rollback。
**

  • commit:操作将会导致生产者事务中所有的消息被持久存储,消费者的所有消息都被确认。
  • rollback:操作将会导致生产者事务中所有的消息被清除,消费者的所有消息不被确认。

JMS事务应答模式(消息确认模式)

以下说明,都是从消费者的角度去分析,与生产者无关。

  • Session.SESSION_TRANSACTED当一个事务被commit的时候,消息确认就会自动发生。如果开启了事务,最后没有执行commit方法,那么消费者会重复消费该消息。

  • Session.AUTO_ACKNOWLEDGE客户(消费者)成功从receive方法返回时,或者从MessageListener.onMessage方法成功返回时,会话自动确认消息

  • Session.CITENT_ACKNOWLEDGE客户(消费者)通过显式调用消息的acknowledge方法确认消息。

  • Session.DUPS_OK_ACKNOWLEDGE(很少用):“懒散式”消息确认,消息可能会重复发送,在第二次重新传送消息时,消息头的JMSRedelivered会被置为true标识当前消息已经传送过一次客户(消费者)需要进行消息的重复处理控制。

小实验

将上述实例中的队列生产者代码更改如下

开启事务:
image.png
只有输入“拜拜”才会执行事务提交
image.png
运行生产者并发送第一条消息:
image.png
队列中没有记录消息(因为生产者还没提交事务)
image.png
消费者也没有消费到消息:
image.png
多输两条然后提交事务咯:
image.png
消费者消费了四条数据
image.png
此时管理后台出现了4条入队消息
image.png
为什么管理出队消息为0?因为消费者这边也设置了开启事务,而在消费消息的时候却没有提交事务
image.png
image.png

至此,JMS相关知识基本完成学习,后续文章学习消息中间件相关内容

有关JMS的更多内容可以参考这个网站:http://www.bjpowernode.com/tutorial_activemq/868.html