1. 环境:
  2. 低配也需要3台服务器!!!!3
  3. 3台服务器 centos7 jdk1.6
  4. zookeeper安装包3.14 activemq安装包 5.9
  5. jdkactivemq安装包对比图!!!一定要看!!!!
  6. 采用zookeeper去管理activemq 必须是5.9开始的版本!!!只有5.9之后才支持!!!
  7. 作者踩过的坑 采用了activemq5.11的安装包 启动不了 查看jdk版本太低
  8. 采用5.8 的安装包 无法解析xml levelDB 等错误信息
  9. apache-activemq-5.0.0 1.5.0_12 1.5+
  10. apache-activemq-5.1.0 1.5.0_12 1.5+
  11. apache-activemq-5.2.0 1.5.0_15 1.5+
  12. apache-activemq-5.3.0 1.5.0_17 1.5+
  13. apache-activemq-5.4.0 1.5.0_19 1.5+
  14. apache-activemq-5.5.0 1.6.0_23 1.6+
  15. apache-activemq-5.6.0 1.6.0_26 1.6+
  16. apache-activemq-5.7.0 1.6.0_33 1.6+
  17. apache-activemq-5.8.0 1.6.0_37 1.6+
  18. apache-activemq-5.9.0 1.6.0_51 1.6+
  19. apache-activemq-5.10.0 1.7.0_12-ea 1.7+
  20. apache-activemq-5.11.0 1.7.0_60 1.7+
  21. apache-activemq-5.12.0 1.7.0_80 1.7+
  22. apache-activemq-5.13.0 1.7.0_80 1.7+
  23. apache-activemq-5.14.0 1.7.0_80 1.7+
  24. apache-activemq-5.15.0 1.8.0_112 1.8+
  25. 这里先介绍一下acviteMQ 3种集群模式
  26. 1、默认的单机部署(kahadb
  27. 2、基于zookeeper的主从(levelDB Master/Slave
  28. 3、基于共享数据库的主从(Shared JDBC Master/Slave
  29. 作者采用 的是第二种采用zookeeper实现管理activitemq集群

zookeeper集群配置

  1. 先安装zookeeper集群
  2. 作者习惯的安装目录是local
  3. [root@localhost local]# tar -zxvf zookeeper-**.tar.gz
  4. [root@localhost local]# mv zookeeper-** zookeeper ##取一个简单点的名字
  5. [root@localhost local]# cd zookeeper/conf 进入zookeeperconf配置
  6. [root@localhost conf]# cp zoo_sample.conf zoo.cfg 默认采用zoo_sample.conf 2个文件只能选择一个
  7. [root@localhost conf]# rm -rf zoo_sample.conf 删除自带的zoo_sample.conf
  8. [root@localhost conf]# vim zoo.conf
  9. 添加如下内容 有几个服务就写几个
  10. server.0=master:2888:3888 ##master在这里是因为作者电脑配置了免密所以这样 可以写成ip
  11. server.1=slave:2888:3888 ##slave在这里是因为作者电脑配置了免密所以这样 可以写成ip
  12. server.2=slave:2888:3888
  13. 里面默认的clinet的端口是2181 如果有需要也可以进行修改
  14. 保存退出
  15. [root@localhost conf]#vim ../data/myid 修改data目录的myid
  16. 0
  17. 保存退出
  18. 开始配置环境变量
  19. export ZOOKEEPER_HME=/usr/local/zookeeper
  20. export PATH=$PATH:$ZOOKEEPER/bin
  21. 保存退出
  22. source /etc/profile
  23. 进行分发操作把Azookeeper配置分发到B服务器 需要修改myid的参数值不能重复
  24. ./zkServer start 分别启动2台服务器的zookeeper服务
  25. ./zkServer status 查看启动是否成功 一个领队 一个跟随者
  26. A
  27. ZooKeeper JMX enabled by default
  28. Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
  29. Mode: leader
  30. B
  31. ZooKeeper JMX enabled by default
  32. Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
  33. Mode: follower
  34. zookeeper集群配置到这里就OK

activeMQ集群配置

作者的安装包已经上传了
tar -zxvf  apache-activemq-5.9.0 #解压
cd activitmq/conf  #进入conf目录  修改配置文件
brokerName的名称需要修改 如果是在2台服务器上那么就无所谓了
配置activite集群 最少需要3台服务器   采用zookeeper管理只有brokerName一致才会被认为是集群
 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="master" dataDirectory="${activemq.data}">

 <persistenceAdapter>
         <replicatedLevelDB
                 directory="${activemq.data}/leveldb
                 replicas="3"
                 bind="tcp://0.0.0.0:51121" 集群当中的通讯端口 
                 zkAddress="127.0.0.1:2181,127.0.0.1:2182" zookeeper集群的地址
                 zkPath="/activemq/leveldb-stores"
          />
</persistenceAdapter>
这里的重要的一个属性 需要注意 
replicas : 集群中的节点数【(replicas/2)+1公式表示集群中至少要正常运行的服务数量】, 3台集群那么允许1台宕机, 另外两台要正常运行 这个参数值设置有误则会出现以下提示 提示数量不足:无法确定master 
Not enough cluster members connected to elect a master.

配置完毕 采用scp -r 分发命令 分发到B C服务器
分别启动两台服务器的activitmq服务 打开防火墙的8161端口 默认的clinet端口 提供给客户一个web操作界面
进行测试是否成功
启动3台服务器的mq服务
打开防火墙8161端口  3个ip   只能打开一个 因为只有一个master主机
关闭master主机  剩下2台会再推出一台。继续访问该链接还可以访问那么就配置成功了
关闭master 的服务
再通过该节点访问 如果还能访问该链接 那么配置集群成功

测试代码

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
//发送消息
public class P2PSender {
    private static final String QUEUE = "client1-to-client2";

    public static void main(String[] args) throws Exception {

        //1. 建立一个ConnectionFactory. 默认tcp://0.0.0.0:61616
        String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        String brokerURL = "failover:(tcp://192.168.99.1:61616,tcp://192.168.99.2:61616,tcp:/192.168.99.3:61616)?Randomize=false";
//作者这里采用的默认配置 也就是有密码  默认activitemq密码是 admin  rabbitmq的默认的是guest
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", brokerURL );

        //2. 通过ConnectionFactory建立一个Connection连接,并且调用start方法开启
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //3. 通过Connection创建Session,用于接收消息[第一个参数:是否启用事务;第二个参数:设置签收模式]
//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Session session = connection.createSession( false, Session.CLIENT_ACKNOWLEDGE );//手工签收--常用

        //4. 通过Session创建Destination对象
        Destination destination = session.createQueue( "foodQueue" );

        //5. 通过Session创建发送或接受对象
        MessageProducer messageProducer = session.createProducer( null );

        //7. 使用JMS规范里面消息类型之一 TextMessage 来创建数据,用MessageProducer来发送
        for (int i = 1; i < 500; i++) {
            TextMessage message = session.createTextMessage();
            message.setText( "好了 测试成功了" + i );
            //参数:目标,消息,传递数据的模式,优先级,消息的过期时间
            messageProducer.send( destination, message, DeliveryMode.NON_PERSISTENT, 0, 1000 * 60 );
            //System.out.println("生产者:" + message.getText());
        }

        //使用事务要手动提交
        //session.commit();

        //8. 关闭连接
        connection.close();
    }
}



import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class P2PReceiver {
    private static final String QUEUE = "client1-to-client2";

    public static void main(String[] args) throws Exception {
        //1. 建立一个ConnectionFactory. tcp://0.0.0.0:61616
        String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        String brokerURL = "failover:(tcp://192.168.99.1:61616,tcp://192.168.99.2:61616,tcp://192.168.99.3:61616)?Randomize=false";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", brokerURL );

        //2. 通过ConnectionFactory建立一个Connection连接,并且调用start方法开启
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //3. 通过Connection创建Session,用于接收消息[第一个参数:是否启用事务;第二个参数:设置签收模式]
        Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );

        //4. 通过Session创建Destination对象
        Destination destination = session.createQueue( "foodQueue" );

        //5. 通过Session创建发送或接受对象
        MessageConsumer messageConsumer = session.createConsumer( destination );

        //7. 使用JMS规范里面消息类型之一 TextMessage 来创建数据
        while (true) {
            TextMessage message = (TextMessage) messageConsumer.receive();
            if (message == null) break;
            System.out.println( "消费者:" + message.getText() );
        }

        //8. 关闭连接
        connection.close();
    }
}