什么是Zookeeper

Zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。
1、Zookeeper是为别的分布式程序服务的
2、Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
3、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统> 一名称服务等
4、虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:
管理(存储,读取)用户程序提交的数据(类似namenode中存放的metadata);
并为用户程序提供数据节点监听服务;

Zookeeper集群机制

Zookeeper集群的角色: Leader 和 follower
只要集群中有半数以上节点存活,集群就能提供服务

Zookeeper特性

1、Zookeeper:一个leader,多个follower组成的集群
2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
3、分布式读写,更新请求转发,由leader实施
4、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
5、数据更新原子性,一次数据更新要么成功,要么失败
6、实时性,在一定时间范围内,client能读到最新数据

Zookeeper数据结构

1、层次化的目录结构(图数据结构),命名符合常规文件系统规范(类似文件系统)
image.png
2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
3、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
节点类型
a、Znode有两种类型:
短暂(ephemeral)(create -e /app1/test1 “test1” 客户端断开连接zk删除ephemeral类型节点)
持久(persistent) (create -s /app1/test2 “test2” 客户端断开连接zk不删除persistent类型节点)
b、Znode有四种形式的目录节点(默认是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
EPHEMERAL
EPHEMERAL_SEQUENTIAL
c、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
image.png
d、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

Zookeeper应用场景

数据发布与订阅(配置中心)

发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。

负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。
消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的 metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下:
生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。
消费负载均衡: 在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

  1. 每个分区针对同一个group只挂载一个消费者。
  2. 如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。
  3. 如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。
    在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。
    命名服务(Naming Service)

命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。
阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表, 点击这里查看Dubbo开源项目。在Dubbo实现中:
服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。
服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。
注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。

分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理

  1. 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。
  2. 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。
  3. 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。
    总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合
    集群管理与Master选举
  4. 集群机器监控:这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报“我还活着”。 这种做法可行,但是存在两个比较明显的问题:
  5. 集群中机器有变动的时候,牵连修改的东西比较多。
  6. 有一定的延时。
    利用ZooKeeper有两个特性,就可以实现另一种集群机器存活性监控系统:
  7. 客户端在节点 x 上注册一个Watcher,那么如果 x?的子节点变化了,会通知该客户端。
  8. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。
    例如,监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。
  9. Master选举则是zookeeper中最为经典的应用场景了。
    在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。
    利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。
    另外,这种场景演化一下,就是动态Master选举。这就要用到EPHEMERAL_SEQUENTIAL类型节点的特性了。
    上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。
  10. 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成,然后同步到集群中其它机器。另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。
  11. 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选举出一个HMaster来运行,从而避免了HMaster的单点问题

分布式锁

分布式锁,这个主要得益于 ZooKeeper 为我们保证了数据的强一致性。锁服务可以分为两类,一个是 保持独占,另一个是 控制时序。

  1. 所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把 zk 上的一个 znode 看作是一把锁,通过 create znode 的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。
  2. 控制时序,就是所有视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distributelock 已经预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERALSEQUENTIAL 来指定)。Zk 的父节点(/distribute_lock)维持一份 sequence, 保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

zookeeper公平可重入分布式锁

  1. /**
  2. * @author wangyanqing
  3. * @date 2020/9/11
  4. * @description TODO zookeeper实现的公平可重入分布式锁
  5. * <p>
  6. * 我们通过重写接口中的方法实现一个可重入锁。
  7. * lock:请求锁,如果成功则直接返回,不成功则阻塞 直到获取锁。
  8. * lockInterruptibly:请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断
  9. * tryLock:1、尝试获取锁,获取失败的话 直接返回false,不会再等待。2、尝试获取锁,获取成功返回true,否则一直请求,直到超时返回false
  10. * unlock:释放锁
  11. */
  12. public class DistributedFairLock implements Lock {
  13. private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);
  14. //ZooKeeper客户端,进行ZooKeeper操作
  15. private ZooKeeper zooKeeper;
  16. //根节点名称
  17. private String dir;
  18. //加锁节点
  19. private String node;
  20. //ZooKeeper鉴权信息
  21. private List<ACL> acls;
  22. //要加锁节点
  23. private String fullPath;
  24. //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
  25. private volatile int state;
  26. //当前锁创建的节点id
  27. private String id;
  28. //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
  29. private CountDownLatch countDownLatch;
  30. /**
  31. * Constructor.
  32. *
  33. * @param zooKeeper the zoo keeper
  34. * @param dir the dir
  35. * @param node the node
  36. * @param acls the acls
  37. */
  38. public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
  39. this.zooKeeper = zooKeeper;
  40. this.dir = dir;
  41. this.node = node;
  42. this.acls = acls;
  43. this.fullPath = dir.concat("/").concat(this.node);
  44. init();
  45. }
  46. private void init() {
  47. try {
  48. Stat stat = zooKeeper.exists(dir, false);
  49. if (stat == null) {
  50. zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
  51. }
  52. } catch (Exception e) {
  53. logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
  54. }
  55. }
  56. @Override
  57. public void lock() {
  58. try {
  59. //加锁
  60. synchronized (this) {
  61. //如果当前未持有锁
  62. if (state <= 0) {
  63. //创建节点
  64. if (id == null) {
  65. id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
  66. }
  67. //获取当前路径下所有的节点
  68. List<String> nodes = zooKeeper.getChildren(dir, false);
  69. SortedSet<String> sortedSet = new TreeSet<>();
  70. for (String node : nodes) {
  71. sortedSet.add(dir.concat("/").concat(node));
  72. }
  73. //获取所有id小于当前节点顺序的节点
  74. SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
  75. if (!lessSet.isEmpty()) {
  76. //监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的
  77. Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
  78. if (stat != null) {
  79. countDownLatch = new CountDownLatch(1);
  80. countDownLatch.await();
  81. }
  82. }
  83. }
  84. state++;
  85. }
  86. } catch (InterruptedException e) {
  87. logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
  88. Thread.currentThread().interrupt();
  89. } catch (KeeperException ke) {
  90. logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
  91. if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
  92. Thread.currentThread().interrupt();
  93. }
  94. }
  95. }
  96. @Override
  97. public void lockInterruptibly() throws InterruptedException {
  98. }
  99. @Override
  100. public boolean tryLock() {
  101. try {
  102. synchronized (this) {
  103. if (state <= 0) {
  104. if (id == null) {
  105. id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
  106. }
  107. List<String> nodes = zooKeeper.getChildren(dir, false);
  108. SortedSet<String> sortedSet = new TreeSet<>();
  109. for (String node : nodes) {
  110. sortedSet.add(dir.concat("/").concat(node));
  111. }
  112. SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
  113. if (!lessSet.isEmpty()) {
  114. return false;
  115. }
  116. }
  117. state++;
  118. }
  119. } catch (InterruptedException e) {
  120. logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
  121. return false;
  122. } catch (KeeperException ke) {
  123. logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
  124. if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
  125. return false;
  126. }
  127. }
  128. return true;
  129. }
  130. @Override
  131. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  132. try {
  133. synchronized (this) {
  134. if (state <= 0) {
  135. if (id == null) {
  136. id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
  137. }
  138. List<String> nodes = zooKeeper.getChildren(dir, false);
  139. SortedSet<String> sortedSet = new TreeSet<>();
  140. for (String node : nodes) {
  141. sortedSet.add(dir.concat("/").concat(node));
  142. }
  143. SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
  144. if (!lessSet.isEmpty()) {
  145. Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
  146. if (stat != null) {
  147. countDownLatch = new CountDownLatch(1);
  148. countDownLatch.await(time, unit);
  149. }
  150. }
  151. }
  152. state++;
  153. }
  154. } catch (InterruptedException e) {
  155. logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
  156. return false;
  157. } catch (KeeperException ke) {
  158. logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
  159. if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
  160. return false;
  161. }
  162. }
  163. return true;
  164. }
  165. @Override
  166. public void unlock() {
  167. synchronized (this) {
  168. if (state > 0) {
  169. state--;
  170. }
  171. //当不再持有锁时,删除创建的临时节点
  172. if (state == 0 && zooKeeper != null) {
  173. try {
  174. zooKeeper.delete(id, -1);
  175. id = null;
  176. } catch (Exception e) {
  177. logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
  178. }
  179. }
  180. }
  181. }
  182. @Override
  183. public Condition newCondition() {
  184. return null;
  185. }
  186. }
/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO zookeeper分布式锁观察者
 * 通过watch机制避免轮询带来的CPU空转。
 * 通过顺序临时节点避免了羊群效应。
 */
public class LockWatcher implements Watcher {
    //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
    private CountDownLatch countDownLatch;
    @Override
    public void process(WatchedEvent event) {
        synchronized (this) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}

zookeeper分布式锁案例代码

/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO zookeeper分布式锁
 */
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
    private CountDownLatch countDownLatch = null;

    @Override
    boolean tryLock() {
        try {
            zkClient.createEphemeral(PATH);
            return true;
        } catch (Exception e) {
//            e.printStackTrace();
            return false;
        }

    }

    @Override
    void waitLock() {
        IZkDataListener izkDataListener = new IZkDataListener() {

            public void handleDataDeleted(String path) throws Exception {
                // 唤醒被等待的线程
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }

            public void handleDataChange(String path, Object data) throws Exception {

            }
        };
        // 注册事件
        zkClient.subscribeDataChanges(PATH, izkDataListener);
        if (zkClient.exists(PATH)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // 删除监听
        zkClient.unsubscribeDataChanges(PATH, izkDataListener);
    }
}

/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
public abstract class ZookeeperAbstractLock implements Lock {
    // zk连接地址
    private static final String CONNECTSTRING = "127.0.0.1:2181";
    // 创建zk连接
    protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
    protected static final String PATH = "/lock";

    public void getLock() {
        if (tryLock()) {
            System.out.println("##获取lock锁的资源####");
        } else {
            // 等待
            waitLock();
            // 重新获取锁资源
            getLock();
        }

    }

    // 获取锁资源
    abstract boolean tryLock();

    // 等待
    abstract void waitLock();

    public void unLock() {
        if (zkClient != null) {
            zkClient.close();
            System.out.println("释放锁资源...");
        }
    }

}

/** 
 * @auther: wangyanqing
 * @description: TODO 
 * @date: 2020/9/11
*/
public class OrderNumGenerator {

    // 生成订单号规则
    private static int count = 0;

    public String getNumber() {
        try {
            Thread.sleep(200);
        } catch (Exception e) {

        }
        SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        return simpt.format(new Date()) + "-" + ++count;
    }

}

/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
public class OrderService implements Runnable {
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
    // 使用lock锁
    // private java.util.concurrent.locks.Lock lock = new ReentrantLock();
    private Lock lock = new ZookeeperDistrbuteLock();

    public void run() {
        getNumber();
    }

    public void getNumber() {
        try {
            lock.getLock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unLock();
        }
    }

    public static void main(String[] args) {
        System.out.println("####生成唯一订单号###");
//        OrderService orderService = new OrderService();
        for (int i = 0; i < 100; i++) {
            new Thread(new OrderService()).start();
        }
    }
}

/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
public interface Lock {
    //获取到锁的资源
    public void getLock();
    // 释放锁
    public void unLock();
}

Zookeeper安装

Zookeeper windows环境安装

环境要求:jdk1.8.0_71
1.安装jdk
2.安装Zookeeper. 在官网http://zookeeper.apache.org/下载zookeeper.我下载的是zookeeper-3.4.6版本。
解压zookeeper-3.4.6至D:\machine\zookeeper-3.4.6.
在D:\machine 新建data及log目录。
3.ZooKeeper的安装模式分为三种,分别为:单机模式(stand-alone)、集群模式和集群伪分布模式。ZooKeeper 单机模式的安装相对比较简单,如果第一次接触ZooKeeper的话,建议安装ZooKeeper单机模式或者集群伪分布模式。
安装单击模式。 至D:\machine\zookeeper-3.4.6\conf 复制 zoo_sample.cfg 并粘贴到当前目录下,命名zoo.cfg.

Zookeeper集群环境搭建(linux)

环境要求:jdk1.8.0_71
结构:一共三个节点
(zk服务器集群规模不小于3个节点),要求服务器之间系统时间保持一致。
上传zk并且解压
进行解压: tar -zxvf zookeeper-3.4.10.tar.gz
重命名: mv zookeeper-3.4.10 zookeeper

修改zookeeper环境变量

vi /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_71
export ZOOKEEPER_HOME=/usr/local/zookeeper
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile

修改zoo_sample.cfg文件

cd /usr/local/zookeeper/confmv zoo_sample.cfg zoo.cfg修改conf: vi zoo.cfg 修改两处(1) dataDir=/usr/local/zookeeper/data(注意同时在zookeeper创建data目录)(2)最后面添加server.0=bhz:2888:3888server.1=hadoop1:2888:3888server.2=hadoop2:2888:3888

创建服务器标识

服务器标识配置:创建文件夹: mkdir data创建文件myid并填写内容为0: vimyid (内容为服务器标识 : 0)

复制zookeeper

进行复制zookeeper目录到hadoop01和hadoop02
还有/etc/profile文件
把hadoop01、 hadoop02中的myid文件里的值修改为1和2
路径(vi /usr/local/zookeeper/data/myid)

启动zookeeper

路径: /usr/local/zookeeper/bin
执行: zkServer.sh start
(注意这里3台机器都要进行启动)
状态: zkServer.sh
status(在三个节点上检验zk的mode,一个leader和俩个follower)

常用命令

zkServer.sh status 查询状态

Zookeeper配置文件介绍

tickTime=2000 #心跳时间,为了确保连接存在的,以毫秒为单位,最小超时时间为两个心跳时间

initLimit=10 #多少个心跳时间内,允许其他server连接并初始化数据,如果ZooKeeper管理的数据较大,则应相应增大这个值

syncLimit=5 #多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃。

dataDir=/home/myuser/zooA/data #用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里(注意:一个配置文件只能包含一个dataDir字样,即使它被注释掉了。)

clientPort=2181 #服务的监听端口

# ZooKeeper server and its port no. # ZooKeeper ensemble should know about every other machine in the ensemble # specify server id by creating 'myid' file in the dataDir # use hostname instead of IP address for convenient maintenance
server.1=127.0.0.1:2888:3888 
server.2=127.0.0.1:2988:3988  
server.3=127.0.0.1:2088:3088 

#server.A=B:C:D:
#A是一个数字,表示这个是第几号服务器,B是这个服务器的ip地址
#C第一个端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口
#D是在leader挂掉时专门用来进行选举leader所用

dataLogDir=/home/myuser/zooA/log #用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争


Java操作Zookeeper

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
</dependency>

watcher

在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)

/**
 * @author wangyanqing
 * @date 2020/9/9
 * @description TODO zookeeper观察者
 */
public class ZkClientWatcher implements Watcher {
    // 集群连接地址
    private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";
    // 会话超时时间
    private static final int SESSIONTIME = 2000;
    // 信号量,让zk在连接之前等待,连接成功后才能往下走.
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String LOG_MAIN = "【main】 ";
    private ZooKeeper zk;

    public void createConnection(String connectAddres, int sessionTimeOut) {
        try {
            zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
            System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean createPath(String path, String data) {
        try {
            this.exists(path, true);
            this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 判断指定节点是否存在
     *
     * @param path
     *            节点路径
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public boolean updateNode(String path,String data) throws KeeperException, InterruptedException {
        exists(path, true);
        this.zk.setData(path, data.getBytes(), -1);
        return false;
    }

    public void process(WatchedEvent watchedEvent) {

        // 获取事件状态
        Event.KeeperState keeperState = watchedEvent.getState();
        // 获取事件类型
        Event.EventType eventType = watchedEvent.getType();
        // zk 路径
        String path = watchedEvent.getPath();
        System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
        // 判断是否建立连接
        if (Event.KeeperState.SyncConnected == keeperState) {
            if (Event.EventType.None == eventType) {
                // 如果建立建立成功,让后程序往下走
                System.out.println(LOG_MAIN + "zk 建立连接成功!");
                countDownLatch.countDown();
            } else if (Event.EventType.NodeCreated == eventType) {
                System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
            } else if (Event.EventType.NodeDataChanged == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
            }
            else if (Event.EventType.NodeDeleted == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
            }

        }
        System.out.println("--------------------------------------------------------");
    }

    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
        zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
//        boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");
        zkClientWatcher.updateNode("/pa2","7894561");
    }
}

使用Zookeeper实现负载均衡

思路:使用Zookeeper实现负载均衡原理,服务器端将启动的服务注册到,zk注册中心上,采用临时节点。客户端从zk节点上获取最新服务节点信息,本地使用负载均衡算法,随机分配服务器。

<dependencies>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.8</version>
        </dependency>
</dependencies>
/**
 * @author wangyanqing
 * @date 2020/9/9
 * @description TODO
 */
public class ZkServerSocket implements Runnable {
    private static int port = 18081;

    public static void main(String[] args) throws IOException {
        ZkServerSocket server = new ZkServerSocket(port);
        Thread thread = new Thread(server);
        thread.start();
    }

    public ZkServerSocket(int port) {
        this.port = port;
    }

    public void regServer() {
        // 向ZooKeeper注册当前服务器
        ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
        String path = "/test/server" + port;
        if (client.exists(path))
            client.delete(path);
        client.createEphemeral(path, "127.0.0.1:" + port);
    }

    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            regServer();
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {

            }
        }
    }

}
/**
 * @author wangyanqing
 * @date 2020/9/9
 * @description TODO
 */
public class ZkServerClient {
    public static List<String> listServer = new ArrayList<String>();
    public static String parent = "/test";

    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 注册所有server
    public static void initServer() {
        // listServer.add("127.0.0.1:18080");

        final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 6000, 1000);
        List<String> children = zkClient.getChildren(parent);
        getChilds(zkClient, children);
        // 监听事件
        zkClient.subscribeChildChanges(parent, new IZkChildListener() {

            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                getChilds(zkClient, currentChilds);
            }
        });
    }

    private static void getChilds(ZkClient zkClient, List<String> currentChilds) {
        listServer.clear();
        for (String p : currentChilds) {
            String pathValue = (String) zkClient.readData(parent + "/" + p);
            listServer.add(pathValue);
        }
        serverCount = listServer.size();
        System.out.println("从zk读取到信息:" + listServer.toString());

    }

    // 请求次数
    private static int reqestCount = 1;
    // 服务数量
    private static int serverCount = 0;

    // 获取当前server信息
    public static String getServer() {
        // 实现负载均衡
        String serverName = listServer.get(reqestCount % serverCount);
        ++reqestCount;
        return serverName;
    }

    public void send(String name) {

        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);

            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
/**
 * @author wangyanqing
 * @date 2020/9/9
 * @description TODO
 */
public class ServerHandler  implements Runnable {
    private Socket socket;

    public ServerHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("Receive : " + body);
                out.println("Hello, " + body);
            }

        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

使用Zookeeper实现哨兵

哨兵解决方式是采用一个备用节点,这个备用节点定期给当前主节点发送ping包,主节点收到ping包后会向备用节点发送应答ack,当备用节点收到应答,就认为主节点还活着,让它继续提供服务,否则就认为主节点挂掉了,自己将开始行使主节点职责
image.png

     <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
@Component
public class MyApplicationRunner implements ApplicationRunner {

    // 创建zk连接
    ZkClient zkClient = new ZkClient("127.0.0.1:2181");
    private String path = "/election";
    @Value("${server.port}")
    private String serverPort;

    public void run(ApplicationArguments args) throws Exception {
        System.out.println("项目启动完成...");
        createEphemeral();
        // 创建事件监听
        zkClient.subscribeDataChanges(path, new IZkDataListener() {

            // 节点被删除
            public void handleDataDeleted(String dataPath) throws Exception {
                // 主节点已经挂了,重新选举
                System.out.println("主节点已经挂了,重新开始选举");
                createEphemeral();
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        });

    }

    private void createEphemeral() {
        try {
            zkClient.createEphemeral(path, serverPort);
            ElectionMaster.isSurvival = true;
            System.out.println("serverPort:" + serverPort + ",选举成功....");
        } catch (Exception e) {
            ElectionMaster.isSurvival = false;
        }
    }

}
/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
public class IndexController {
    // 获取服务信息
    @RequestMapping("/getServerInfo")
    public String getServerInfo() {
        return ElectionMaster.isSurvival ? "当前服务器为主节点" : "当前服务器为从节点";
    }
}
/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
@Component
public class ElectionMaster {

    // 服务器info信息 是否存活
    public static boolean isSurvival;

}

Zookeeper实现分布式配置中心

什么是分布式配置中心

项目中配置文件比较繁杂,而且不同环境的不同配置修改相对频繁,每次发布都需要对应修改配置,如果配置出现错误,需要重新打包发布,时间成本较高,因此需要做统一的分布式注册中心,能做到自动更新配置文件信息,解决以上问题。

常用分布式配置中心框架

首选为disconf,可支持KV存储以及配置文件形式存储,使用和开发更为简便。并且本身也是基于zookpeer的分布式配置中心开发,方便部署使用,并且支持实时更新通知操作,但是部署相对复杂。
Diamond(daɪəmənd)基本可以放弃,一般做KV的存储配置项,做配置文件不是很好的选择。
Spring Cloud Config因为依赖git,使用局限性较大,需要在各个环境中安装git,并且不支持KV存储,功能方面略差于disconf

分布式配置中心实现原理

image.png

/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
@Component
public class MyApplicationRunner  implements ApplicationRunner {
    @Autowired
    private ConfigUtils configUtils;
    // 创建zk连接
    ZkClient zkClient = new ZkClient("127.0.0.1:2181");
    // 启动后执行方法
    public void run(ApplicationArguments args) throws Exception {
        System.out.println("项目启动成功...");
        String itmayieduValue = configUtils.getItmayieduKey();
        String itmayieduKey = "/itmayieduKey";
        try {
            // 创建节点信息
            zkClient.createEphemeral(itmayieduKey, itmayieduValue);
        } catch (Exception e) {
            e.printStackTrace();
        }
        zkClient.subscribeDataChanges(itmayieduKey, new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
            }
            // 当值发生变化的时候
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("dataPath:" + dataPath + ",data:" + data);
                final String strData = (String) data;
                configUtils.setItmayieduKey(strData);
            }
        });

    }

}
/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
@Data
@Component
public class ConfigUtils {
    @Value("${itmayiedu.key}")
    private String itmayieduKey;
}
/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
@RestController
public class IndexController {
    @Autowired
    private ConfigUtils configUtils;
    @Autowired
    private UpdateInfoService updateInfoService;

    @RequestMapping("/getInfo")
    public String getInfo() {
        return configUtils.getItmayieduKey();
    }

    @RequestMapping("/updateInfo")
    public String updateInfo(String key, String value) {
        String updateInfo = updateInfoService.updateInfo(key, value);
        return updateInfo;
    }
}

/**
 * @author wangyanqing
 * @date 2020/9/11
 * @description TODO
 */
@Service
public class UpdateInfoService  {
    // 创建zk连接
    ZkClient zkClient = new ZkClient("127.0.0.1:2181");
    public String updateInfo(String key, String value) {
        try {
            zkClient.writeData("/" + key, value);
            return "success";
        } catch (Exception e) {
            return "fail";
        }
    }
}

Zookeeper集群选举策略

Zookeeper集群选举原理

Zookeeper的角色

1.领导者(leader),负责进行投票的发起和决议,更新系统状态
2.学习者(learner),包括跟随者(follower)和观察者(observer),follower用于接受客户端请求并想客户端返回结果,在选主过程中参与投票
3.Observer可以接受客户端连接,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度
4.客户端(client),请求发起方Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。

Zookeeper的读写机制

 » Zookeeper是一个主多个server组成的集群
 » 一个leader,多个follower
 » 每个server保存一份数据副本
 » 全局数据一致
 » 分布式读写
 » 更新请求转发,由leader实施

Zookeeper的保证

» 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
» 数据更新原子性,一次数据更新要么成功,要么失败
» 全局唯一数据视图,client无论连接到哪个server,数据视图都是一致的
» 实时性,在一定事件范围内,client能读到最新数据

Zookeeper节点数据操作流程

半数通过
    – 3台机器 挂一台 2>3/2
    – 4台机器 挂2台 2!>4/2
 • A提案说,我要选自己,B你同意吗?C你同意吗?B说,我同意选A;C说,我同意选A。(注意,这里超过半数了,其实在现实世界选举已经成功了。

但是计算机世界是很严格,另外要理解算法,要继续模拟下去。)
  • 接着B提案说,我要选自己,A你同意吗;A说,我已经超半数同意当选,你的提案无效;C说,A已经超半数同意当选,B提案无效。
  • 接着C提案说,我要选自己,A你同意吗;A说,我已经超半数同意当选,你的提案无效;B说,A已经超半数同意当选,C的提案无效。
• 选举已经产生了Leader,后面的都是follower,只能服从Leader的命令。而且这里还有个小细节,就是其实谁先启动谁当头。

1.安装jdk1.8.0_181运行jdk环境
2.安装jdk1.8.0_181环境变量(命令行执行)

vi /etc/profile

export JAVA_HOME=/usr/local/jdk1.8.0_181
export ZOOKEEPER_HOME=/usr/local/zookeeper
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH

#立即生效
source /etc/profile

#关闭防火墙
systemctl stop firewalld
service  iptables stop

#wget
yum -y install wget
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

#解压Zookeeper安装包
tar -zxvf zookeeper-3.4.10.tar.gz 

#修改Zookeeper文件夹名称
mv zookeeper-3.4.10 zookeeper

cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo.cfg


cd /usr/local/zookeeper/conf
mv zoo_sample.cfg  zoo.cfg


#修改conf: vi zoo.cfg 修改两处
(1) dataDir=/usr/local/zookeeper/data(注意同时在zookeeper创建data目录)
(2)最后面添加
server.0=192.168.212.154:2888:3888
server.1=192.168.212.156:2888:3888
server.2=192.168.212.157:2888:3888


#创建服务器标识
#服务器标识配置:
#创建文件夹: 
mkdir data
创建文件myid并填写内容为0: vi
myid (内容为服务器标识 : 0)

#复制zookeeper
进行复制zookeeper目录到hadoop01和hadoop02
#还有/etc/profile文件
把hadoop01、 hadoop02中的myid文件里的值修改为1和2
路径(vi /usr/local/zookeeper/data/myid)
启动zookeeper
路径: /usr/local/zookeeper/bin
执行: zkServer.sh start
(注意这里3台机器都要进行启动)
状态: zkServer.sh 
status(在三个节点上检验zk的mode,一个leader和俩个follower)