一、引言


在分布式环境下,如果舍弃SpringCloud,使用其他的分布式框架,那么注册心中,配置集中管理,集群管理,分布式锁,分布式任务,队列的管理想单独实现怎么办。

二、Zookeeper介绍


Zookeeper本身是Hadoop生态园的中的一个组件,Zookeeper强大的功能,在Java分布式架构中,也会频繁的使用到Zookeeper。
作者图片:doug
Zookeeper就是一个文件系统 + 监听通知机制

三、Zookeeper安装


  1. docker-compose.yml
  2. version: "3.1"
  3. services:
  4. zk:
  5. image: daocloud.io/daocloud/zookeeper:latest
  6. restart: always
  7. container_name: zk
  8. ports:
  9. - 2181:2181

四、Zookeeper架构【重点】


4.1 Zookeeper的架构图

  • 每一个节点都没称为znode
  • 每一个znode中都可以存储数据
  • 节点名称是不允许重复的 | Zookeeper的架构图 | | —- | | 图片.png |

4.2 znode类型

四种Znode

  • 持久节点:永久的保存在你的Zookeeper
  • 持久有序节点:永久的保存在你的Zookeeper,他会给节点添加一个有序的序号。 /xx -> /xx0000001
  • 临时节点:当存储的客户端和Zookeeper服务断开连接时,这个临时节点自动删除
  • 临时有序节点:当存储的客户端和Zookeeper服务断开连接时,这个临时节点自动删除,他会给节点添加一个有序的序号。 /xx -> /xx0000001

    4.3 Zookeeper的监听通知机制

    客户端可以去监听Zookeeper中的Znode节点。
    Znode改变时,会通知监听当前Znode的客户端
监听通知机制
图片.png

五、Zookeeper常用命令


Zookeeper针对增删改查的常用命令

  1. # 查询当前节点下的全部子节点
  2. ls 节点名称
  3. # 例子 ls /
  4. # 查询当前节点下的数据
  5. get 节点名称
  6. # 例子 get /zookeeper
  7. # 创建节点
  8. create [-s] [-e] znode名称 znode数据
  9. # -ssequence,有序节点
  10. # -eephemeral,临时节点
  11. # 修改节点值
  12. set znode名称 新数据
  13. # 删除节点
  14. delete znode名称 # 没有子节点的znode
  15. rmr znode名称 # 删除当前节点和全部的子节点

六、Zookeeper集群【重点】


6.1 Zookeeper集群架构图

集群架构图
图片.png

6.2 Zookeeper集群中节点的角色

  • Leader:Master主节点
  • Follower (默认的从节点):从节点,参与选举全新的Leader
  • Observer:从节点,不参与投票
  • Looking:正在找Leader节点

    6.3 Zookeeper投票策略

  • 每一个Zookeeper服务都会被分配一个全局唯一的myid,myid是一个数字。

  • Zookeeper在执行写数据时,每一个节点都有一个自己的FIFO的队列。保证写每一个数据的时候,顺序是不会乱的,Zookeeper还会给每一个数据分配一个全局唯一的zxid,数据越新zxid就越大。

选举Leader:

  • 选举出zxid最大的节点作为Leader。
  • 在zxid相同的节点中,选举出一个myid最大的节点,作为Leader。

    6.4 搭建Zookeeper集群

    yml文件
    1. version: "3.1"
    2. services:
    3. zk1:
    4. image: zookeeper
    5. restart: always
    6. container_name: zk1
    7. ports:
    8. - 2181:2181
    9. environment:
    10. ZOO_MY_ID: 1
    11. ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    12. zk2:
    13. image: zookeeper
    14. restart: always
    15. container_name: zk2
    16. ports:
    17. - 2182:2181
    18. environment:
    19. ZOO_MY_ID: 2
    20. ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    21. zk3:
    22. image: zookeeper
    23. restart: always
    24. container_name: zk3
    25. ports:
    26. - 2183:2181
    27. environment:
    28. ZOO_MY_ID: 3
    29. ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181

    七、Java操作Zookeeper


7.1 Java连接Zookeeper

创建Maven工程
导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.6.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.curator</groupId>
  9. <artifactId>curator-recipes</artifactId>
  10. <version>4.0.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>junit</groupId>
  14. <artifactId>junit</artifactId>
  15. <version>4.12</version>
  16. </dependency>
  17. </dependencies>

编写连接Zookeeper集群的工具类

public class ZkUtil {

    public static CuratorFramework cf(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("192.168.199.109:2181,192.168.199.109:2182,192.168.199.109:2183")
                .retryPolicy(retryPolicy)
                .build();

        cf.start();

        return cf;
    }

}

测试类

测试

7.2 Java操作Znode节点

查询

public class Demo2 {

    CuratorFramework cf = ZkUtil.cf();

    // 获取子节点
    @Test
    public void getChildren() throws Exception {
        List<String> strings = cf.getChildren().forPath("/");

        for (String string : strings) {
            System.out.println(string);
        }
    }

    // 获取节点数据
    @Test
    public void getData() throws Exception {
        byte[] bytes = cf.getData().forPath("/qf");
        System.out.println(new String(bytes,"UTF-8"));
    }

}
添加
@Test
public void create() throws Exception {
    cf.create().withMode(CreateMode.PERSISTENT).forPath("/qf2","uuuu".getBytes());
}
修改
@Test
public void update() throws Exception {
    cf.setData().forPath("/qf2","oooo".getBytes());
}
删除
@Test
public void delete() throws Exception {
    cf.delete().deletingChildrenIfNeeded().forPath("/qf2");
}
查看znode的状态
@Test
public void stat() throws Exception {
    Stat stat = cf.checkExists().forPath("/qf");
    System.out.println(stat);
}

7.3 监听通知机制

实现方式

public class Demo3 {

    CuratorFramework cf = ZkUtil.cf();


    @Test
    public void listen() throws Exception {
        //1. 创建NodeCache对象,指定要监听的znode
        NodeCache nodeCache = new NodeCache(cf,"/qf");
        nodeCache.start();

        //2. 添加一个监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                byte[] data = nodeCache.getCurrentData().getData();
                Stat stat = nodeCache.getCurrentData().getStat();
                String path = nodeCache.getCurrentData().getPath();

                System.out.println("监听的节点是:" + path);
                System.out.println("节点现在的数据是:" + new String(data,"UTF-8"));
                System.out.println("节点状态是:" + stat);

            }
        });

        System.out.println("开始监听!!");
        //3. System.in.read();
        System.in.read();
    }

}