概述

  1. Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。

工作机制

image.png

特点

image.png

  1. 1Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
  2. 2)集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
  3. 3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  4. 4)更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行。
  5. 5)数据更新原子性,一次数据更新要么成功,要么失败。
  6. 6)实时性,在一定时间范围内,Client能读到最新数据。

数据结构

image.png

  1. ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,
  2. 每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

应用场景

  1. 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

统一命名服务

image.png

  1. 在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。
  2. 例如:IP不容易记住,而域名容易记住。

统一配置管理

image.png

  1. 1.分布式环境下,配置文件同步非常常见。
  2. 1)一般要求一个集群中,所有节点的配置信息是 一致的,比如 Kafka 集群。
  3. 2)对配置文件修改后,希望能够快速同步到各个节点上。
  4. 2.配置管理可交由ZooKeeper实现。
  5. 1)可将配置信息写入ZooKeeper上的一个Znode
  6. 2)各个客户端服务器监听这个Znode
  7. 3)一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

统一集群管理

image.png

  1. 1.分布式环境中,实时掌握每个节点的状态是必要的。
  2. 1)可根据节点实时状态做出一些调整。
  3. 2.ZooKeeper可以实现实时监控节点状态变化
  4. 1)可将节点信息写入ZooKeeper上的一个ZNode
  5. 2)监听这个ZNode可获取它的实时状态变化。

服务器动态上下线

image.png

软负载均衡

image.png

  1. Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

Zookeeper安装

  1. 1.安装前准备
  2. 1)安装 Jdk
  3. 2)拷贝 Zookeeper 安装包到 Linux 系统下
  4. 3)解压到指定目录
  5. tar -zxvf zookeeper3.4.10.tar.gz -C /opt/module/
  6. 2.配置修改
  7. 1)将/opt/module/zookeeper-3.4.10/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg
  8. mv zoo_sample.cfg zoo.cfg
  9. 2)打开 zoo.cfg 文件,修改 dataDir 路径
  10. vim zoo.cfg
  11. 修改如下内容:
  12. dataDir=/opt/module/zookeeper-3.4.10/zkData
  13. 3)在/opt/module/zookeeper-3.4.10/这个目录上创建 zkData 文件夹
  14. mkdir zkData
  15. 3.操作 Zookeeper
  16. 1)启动 Zookeeper
  17. bin/zkServer.sh start
  18. 2)查看进程是否启动
  19. jps - QuorumPeerMain
  20. 3)查看状态
  21. bin/zkServer.sh status
  22. 4)启动客户端
  23. bin/zkCli.sh
  24. 5)退出客户端
  25. quit
  26. 6)停止 Zookeeper
  27. bin/zkServer.sh stop

配置参数

  1. Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
  2. 1tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,
  3. 单位毫秒 Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,
  4. 也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
  5. 它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)
  6. 2initLimit =10LF 初始通信时限 集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时
  7. 能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。
  8. 3syncLimit =5LF 同步通信时限 集群中LeaderFollower之间的最大响应时间单位,
  9. 假如响应超过syncLimit * tickTimeLeader认为Follwer死掉,从服务器列表中删除Follwer
  10. 4dataDir:数据文件目录+数据持久化路径 主要用于保存 Zookeeper 中的数据。
  11. 5clientPort =2181:客户端连接端口 监听客户端连接的端口。

Zookeeper原理

选举机制

  1. 1)半数机制:集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。
  2. 2Zookeeper 虽然在配置文件中并没有指定 Master Slave。但是,Zookeeper 工作时,
  3. 是有一个节点为 Leader,其他则为 FollowerLeader 是通过内部的选举机制临时产生的。
  4. 3)以一个简单的例子来说明整个选举的过程。
  5. 假设有五台服务器组成的 Zookeeper 集群,它们的 id 1-5,同时它们都是最新启动的,
  6. 也就是没有历史数据,在存放数据量这一点上,都是一样的。
  7. 假设这些服务器依序启动,来看看会发生什么,如下图:

image.png

  1. 1)服务器 1 启动,发起一次选举。服务器 1 投自己一票。此时服务器 1 票数一票,
  2. 不够半数以上(3 票),选举无法完成,服务器 1 状态保持为 LOOKING
  3. 2)服务器 2 启动,再发起一次选举。服务器 1 2 分别投自己一票并交换选票信息:
  4. 此时服务器 1 发现服务器 2 ID 比自己目前投票推举的(服务器 1)大,更改选票为推举
  5. 服务器 2。此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,
  6. 服务器 12 状态保持 LOOKING
  7. 3)服务器 3 启动,发起一次选举。此时服务器 1 2 都会更改选票为服务器 3。此
  8. 次投票结果:服务器 1 0 票,服务器 2 0 票,服务器 3 3 票。此时服务器 3 的票数已
  9. 经超过半数,服务器 3 当选 Leader。服务器 12 更改状态为 FOLLOWING,服务器 3 更改
  10. 状态为 LEADING
  11. 4)服务器 4 启动,发起一次选举。此时服务器 123 已经不是 LOOKING 状态,
  12. 不会更改选票信息。交换选票信息结果:服务器 3 3 票,服务器 4 1 票。此时服务器 4
  13. 服从多数,更改选票信息为服务器 3,并更改状态为 FOLLOWING
  14. 5)服务器 5 启动,同 4 一样当小弟。

节点类型

image.png

Stat 结构体

  1. 1czxid-创建节点的事务 zxid 每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,
  2. 也就是ZooKeeper事务ID 事务 ID ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid
  3. 如果 zxid1 小于 zxid2,那么 zxid1 zxid2 之前发生。
  4. 2ctime - znode 被创建的毫秒数(从 1970 年开始)
  5. 3mzxid - znode 最后更新的事务 zxid
  6. 4mtime - znode 最后修改的毫秒数(从 1970 年开始)
  7. 5pZxid-znode 最后更新的子节点 zxid
  8. 6cversion - znode 子节点变化号,znode 子节点修改次数
  9. 7dataversion - znode 数据变化号
  10. 8aclVersion - znode 访问控制列表的变化号
  11. 9ephemeralOwner- 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0
  12. 10dataLength- znode 的数据长度
  13. 11numChildren - znode 子节点数量

监听器原理

image.png

写数据流程

image.png

API使用

  1. 添加 pom 文件
  2. <dependencies>
  3. <dependency>
  4. <groupId>junit</groupId>
  5. <artifactId>junit</artifactId>
  6. <version>RELEASE</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.logging.log4j</groupId>
  10. <artifactId>log4j-core</artifactId>
  11. <version>2.8.2</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zook eeper -->
  14. <dependency>
  15. <groupId>org.apache.zookeeper</groupId>
  16. <artifactId>zookeeper</artifactId>
  17. <version>3.4.10</version>
  18. </dependency>
  19. </dependencies>
  20. 拷贝 log4j.properties 文件到项目根目录
  21. 需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。
  22. log4j.rootLogger=INFO, stdout
  23. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  24. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  25. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  26. log4j.appender.logfile=org.apache.log4j.FileAppender
  27. log4j.appender.logfile.File=target/spring.log
  28. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  29. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  30. 创建 ZooKeeper 客户端
  31. private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
  32. private static int sessionTimeout = 2000;
  33. private ZooKeeper zkClient = null;
  34. @Before
  35. public void init() throws Exception {
  36. zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher(){
  37. @Override
  38. public void process(WatchedEvent event){
  39. // 收到事件通知后的回调函数(用户的业务逻辑)
  40. System.out.println(event.getType() + "--" + event.getPath());
  41. // 再次启动监听
  42. try {
  43. zkClient.getChildren("/", true);
  44. }catch (Exception e){
  45. e.printStackTrace();
  46. }
  47. }
  48. });
  49. }
  50. 创建子节点
  51. // 创建子节点
  52. @Test
  53. public void create() throws Exception{
  54. // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权 限 ;参数 4:节点的类型
  55. String nodeCreated = zkClient.create("/atguigu", "jinlian".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  56. }
  57. 获取子节点并监听节点变化
  58. // 获取子节点
  59. @Test
  60. public void getChildren() throws Exception{
  61. List<String> children = zkClient.getChildren("/", true);
  62. for (String child : children){
  63. System.out.println(child);
  64. }
  65. // 延时阻塞
  66. Thread.sleep(Long.MAX_VALUE);
  67. }
  68. 判断 Znode 是否存在
  69. // 判断 znode 是否存在
  70. @Test
  71. public void exist() throws Exception{
  72. Stat stat = zkClient.exists("/eclipse", false);
  73. System.out.println(stat == null ? "not exist" : "exist");
  74. }

Zookeeper 实战

xsync同步脚本(需要基于hadoop)

  1. 先安装rsync
  2. rsync主要用于备份和镜像,具有速度快、避免复制相同内容和支持符号链接的优点。
  3. rsyncscp区别
  4. 1.rsync做文件的复制要比scp的速度快,rsync只对差异文件做更新。
  5. 2.scp是把所有文件都复制过去。
  6. yum -y install rsync
  7. 基本语法
  8. rsync -rvl $pdir/$fname $user@hadoop$host:$pdir/$fname
  9. 命令 选项参数 要拷贝的文件路径/名称 目的用户@主机:目的路径/名称
  10. -r 归档拷贝
  11. -v 显示复制过程
  12. -l 拷贝符号链接
  13. 1.创建xshync脚本
  14. <!---创建文件夹--->
  15. [lovefo@hadoop102 ~]$ mkdir bin
  16. [lovefo@hadoop102 ~]$ cd bin/
  17. [lovefo@hadoop102 bin]$ touch xsync
  18. [lovefo@hadoop102 bin]$ vi xsync
  19. 2.编写内容:
  20. #! /bin/bash
  21. # $#:表示传递给脚本或函数的参数个数。
  22. #1 获取输入参数的个数,如果没有参数直接退出
  23. pcount=$#
  24. if((pcount==0));then
  25. echo no args;
  26. exit;
  27. fi
  28. #2 获取文件名称
  29. p1=$1
  30. fname='basename $p1'
  31. echo fname=$fname
  32. #3 获取上级目录到绝对路径
  33. pdir='cd -p $(dirname $p1);pwd'
  34. echo pdir=$pdir
  35. #4 获取当前用户的名称
  36. user='whoami'
  37. #5循环
  38. for((host=103;host<105;host++));do
  39. # hadoop$host
  40. echo -----------hadoop$host-----------
  41. rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
  42. done
  43. 3.修改脚本xsync具有执行权限
  44. chmod 777 xsync
  45. 4.调用脚本形式
  46. xsync 文件名称

分布式安装部署

  1. 1.集群规划
  2. hadoop102hadoop103 hadoop104 三个节点上(奇数)部署 Zookeeper
  3. 以下操作都在hadoop102上操作即可。
  4. 2.解压安装
  5. 1)解压 Zookeeper 安装包到/opt/module/目录下
  6. tar -zxvf zookeeper3.4.10.tar.gz -C /opt/module/
  7. 2)同步/opt/module/zookeeper-3.4.10 目录内容到 hadoop103hadoop104
  8. xsync zookeeper-3.4.10/
  9. 3.配置服务器编号
  10. 1)在/opt/module/zookeeper-3.4.10/这个目录下创建 zkData
  11. mkdir -p zkData
  12. 2)在/opt/module/zookeeper-3.4.10/zkData 目录下创建一个 myid 的文件
  13. touch myid 注意:添加 myid 文件,注意一定要在 linux 里面创建,在 notepad++里面很可能乱码
  14. 3)编辑 myid 文件
  15. vi myid
  16. 在文件中添加与 server 对应的编号:
  17. 2
  18. 4)拷贝配置好的 zookeeper 到其他机器上
  19. xsync myid
  20. 并分别在 hadoop103hadoop104 上修改 myid 文件中内容为 34
  21. 4.配置 zoo.cfg 文件
  22. 1)重命名/opt/module/zookeeper-3.4.10/conf 这个目录下的 zoo_sample.cfg zoo.cfg
  23. mv zoo_sample.cfg zoo.cfg
  24. 2)打开 zoo.cfg 文件
  25. vim zoo.cfg
  26. 修改数据存储路径配置
  27. dataDir=/opt/module/zookeeper-3.4.10/zkData
  28. 增加如下配置
  29. #######################cluster##########################
  30. server.2=hadoop102:2888:3888
  31. server.3=hadoop103:2888:3888
  32. server.4=hadoop104:2888:3888
  33. 3)同步 zoo.cfg 配置文件
  34. xsync zoo.cfg
  35. 4)配置参数解读
  36. server.A=B:C:D
  37. A 是一个数字,表示这个是第几号服务器;
  38. 集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,
  39. Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server
  40. B 是这个服务器的地址;
  41. C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
  42. D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader
  43. 而这个端口就是用来执行选举时服务器相互通信的端口。
  44. 5.集群操作
  45. 1)分别启动 Zookeeper
  46. 进入到Zookeeper目录,分别启动各个服务器的Zookeeper
  47. 服务器1bin/zkServer.sh start
  48. 服务器2bin/zkServer.sh start
  49. 服务器3bin/zkServer.sh start
  50. 2)查看状态
  51. 服务器1bin/zkServer.sh status
  52. 服务器2bin/zkServer.sh status
  53. 服务器3bin/zkServer.sh status

客户端命令行操作

image.png

  1. 1.启动客户端
  2. bin/zkCli.sh
  3. 2.显示所有操作命令
  4. help
  5. 3.查看当前 znode 中所包含的内容
  6. ls /
  7. 4.查看当前节点详细数据
  8. ls2 /
  9. 5.分别创建 2 个普通节点
  10. create /sanguo "jinlian"
  11. create /sanguo/shuguo "liubei"
  12. 6.获得节点的值
  13. get /sanguo
  14. get /sanguo/shuguo
  15. 7.创建短暂节点
  16. create -e /sanguo/wuguo "zhouyu"
  17. 1)在当前客户端是能查看到的
  18. ls /sanguo
  19. 2)退出当前客户端然后再重启客户端
  20. quit
  21. bin/zkCli.sh
  22. 3)再次查看根目录下短暂节点已经删除
  23. ls /sanguo
  24. 8.创建带序号的节点
  25. 1)先创建一个普通的根节点/sanguo/weiguo
  26. create /sanguo/weiguo "caocao"
  27. 2)创建带序号的节点
  28. create -s /sanguo/weiguo/xiaoqiao "jinlian"
  29. create -s /sanguo/weiguo/daqiao "jinlian"
  30. create -s /sanguo/weiguo/diaocan "jinlian"
  31. 如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
  32. 9.修改节点数据值
  33. set /sanguo/weiguo "simayi"
  34. 10.节点的值变化监听
  35. 1)在 hadoop104 主机上注册监听/sanguo 节点数据变化
  36. get /sanguo watch
  37. 2)在 hadoop103 主机上修改/sanguo 节点的数据
  38. set /sanguo "xisi"
  39. 3)观察 hadoop104 主机收到数据变化的监听
  40. 显示节点数据有变化了
  41. 11.节点的子节点变化监听(路径变化)
  42. 1)在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
  43. ls /sanguo watch
  44. 2)在 hadoop103 主机/sanguo 节点上创建子节点
  45. create /sanguo/jin "simayi"
  46. 3)观察 hadoop104 主机收到子节点变化的监听
  47. 显示节点数据有变化了
  48. 12.删除节点
  49. delete /sanguo/jin
  50. 13.递归删除节点
  51. rmr /sanguo/shuguo
  52. 14.查看节点状态
  53. stat /sanguo