window下 ActiveMQ安装

ActiveMQ部署其实很简单,和所有Java一样,要跑java程序就必须先安装JDK并配置好环境变量,这个很简单。
然后解压下载的apache-activemq-5.10-20140603.133406-78-bin.zip压缩包到一个目录,得到解压后的目录结构如下图:
image.png

进入bin目录,发现有win32和win64两个文件夹,这2个文件夹分别对应windows32位和windows64位操作系统的启动脚本。

image.png

我的实验环境是windowsXP,就进入win32目录,会看到如下目录结构。

image.png

其中activemq.bat便是启动脚本,双击启动。

image.png

ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。

控制台介绍

ActiveMQ - 图5

Number Of Consumers 消费者 这个是消费者端的消费者数量
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数 Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

这个要分两种情况理解

在queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
在 topics里 它因为多消费者从而导致数量会比入队列数高。
简单的理解上面的意思就是
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1.
在来一条消息时,等待消费的消息是1,进入队列的消息就是2.

没有消费者时 Pending Messages 和 入队列数量一样
有消费者消费的时候 Pedding会减少 出队列会增加
到最后 就是 入队列和出队列的数量一样多
以此类推,进入队列的消息和出队列的消息是池子,等待消费的消息是水流。

实现点对点通讯模式

使用ActiveMQ完成点对点(p2p)通讯模式

引入pom文件依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-core</artifactId>
  5. <version>5.7.0</version>
  6. </dependency>
  7. </dependencies>

生产者

  1. public class Producter {
  2. public static void main(String[] args) throws JMSException {
  3. // ConnectionFactory :连接工厂,JMS 用它创建连接
  4. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
  5. ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  6. // JMS 客户端到JMS Provider 的连接
  7. Connection connection = connectionFactory.createConnection();
  8. connection.start();
  9. // Session: 一个发送或接收消息的线程
  10. Session session = connection.createSession(Boolean.falst, Session.AUTO_ACKNOWLEDGE);
  11. // Destination :消息的目的地;消息发送给谁.
  12. // 获取session注意参数值my-queue是Query的名字
  13. Destination destination = session.createQueue("my-queue");
  14. // MessageProducer:消息生产者
  15. MessageProducer producer = session.createProducer(destination);
  16. // 设置不持久化
  17. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  18. // 发送一条消息
  19. for (int i = 1; i <= 5; i++) {
  20. sendMsg(session, producer, i);
  21. }
  22. session.commit();
  23. connection.close();
  24. }
  25. /**
  26. * 在指定的会话上,通过指定的消息生产者发出一条消息
  27. *
  28. * @param session
  29. * 消息会话
  30. * @param producer
  31. * 消息生产者
  32. */
  33. public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
  34. // 创建一条文本消息
  35. TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  36. // 通过消息生产者发出消息
  37. producer.send(message);
  38. }
  39. }

消费者

  1. public class JmsReceiver {
  2. public static void main(String[] args) throws JMSException {
  3. // ConnectionFactory :连接工厂,JMS 用它创建连接
  4. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
  5. ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  6. // JMS 客户端到JMS Provider 的连接
  7. Connection connection = connectionFactory.createConnection();
  8. connection.start();
  9. // Session: 一个发送或接收消息的线程
  10. Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  11. // Destination :消息的目的地;消息发送给谁.
  12. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  13. Destination destination = session.createQueue("my-queue");
  14. // 消费者,消息接收者
  15. MessageConsumer consumer = session.createConsumer(destination);
  16. while (true) {
  17. TextMessage message = (TextMessage) consumer.receive();
  18. if (null != message) {
  19. System.out.println("收到消息:" + message.getText());
  20. session.commit();
  21. } else
  22. break;
  23. }
  24. session.close();
  25. connection.close();
  26. }
  27. }

JMS消息可靠机制

ActiveMQ消息签收机制:

客戶端成功接收一条消息的标志是一条消息被签收,成功应答。
消息的签收情形分两种:

1、带事务的session
如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。

2、不带事务的session
不带事务的session的签收方式,取决于session的配置。

Activemq支持一下三种模式:

Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头 的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

发布订阅

生产者

  1. public class TOPSend {
  2. private static String BROKERURL = "tcp://127.0.0.1:61616";
  3. private static String TOPIC = "my-topic";
  4. public static void main(String[] args) throws JMSException {
  5. start();
  6. }
  7. static public void start() throws JMSException {
  8. System.out.println("生产者已经启动....");
  9. // 创建ActiveMQConnectionFactory 会话工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  11. ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. // 启动JMS 连接
  14. connection.start();
  15. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  16. MessageProducer producer = session.createProducer(null);
  17. producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  18. send(producer, session);
  19. System.out.println("发送成功!");
  20. connection.close();
  21. }
  22. static public void send(MessageProducer producer, Session session) throws JMSException {
  23. for (int i = 1; i <= 5; i++) {
  24. System.out.println("我是消息" + i);
  25. TextMessage textMessage = session.createTextMessage("我是消息" + i);
  26. Destination destination = session.createTopic(TOPIC);
  27. producer.send(destination, textMessage);
  28. }
  29. }
  30. }

消费者:

  1. public class TopReceiver {
  2. private static String BROKERURL = "tcp://127.0.0.1:61616";
  3. private static String TOPIC = "my-topic";
  4. public static void main(String[] args) throws JMSException {
  5. start();
  6. }
  7. static public void start() throws JMSException {
  8. System.out.println("消费点启动...");
  9. // 创建ActiveMQConnectionFactory 会话工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  11. ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. // 启动JMS 连接
  14. connection.start();
  15. // 不开消息启事物,消息主要发送消费者,则表示消息已经签收
  16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  17. // 创建一个队列
  18. Topic topic = session.createTopic(TOPIC);
  19. MessageConsumer consumer = session.createConsumer(topic);
  20. // consumer.setMessageListener(new MsgListener());
  21. while (true) {
  22. TextMessage textMessage = (TextMessage) consumer.receive();
  23. if (textMessage != null) {
  24. System.out.println("接受到消息:" + textMessage.getText());
  25. // textMessage.acknowledge();// 手动签收
  26. // session.commit();
  27. } else {
  28. break;
  29. }
  30. }
  31. connection.close();
  32. }
  33. }