Zookeeper 简介

zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。 在大数据技术生态圈中:Zookeeper-动物管理员、Hadoop-大象、Hive-蜜蜂、Pig-猪等技术。

zookeeper 是一个基于观察者模式(一个人干活,有人盯着他)设计的分布式服务管理框架
Zookeeper负责 存储 和管理 数据

  • 接受观察者的注册,一旦这些数据发生变化
  • Zookeeper就将负责通知已经注册的那些观察者做出相应的反应
  • 从而实现集群中类似Master/Slave管理模式

Zookeeper的工作机制:
Zookeeper = 文件系统 + 监听通知机制

  1. 商家营业入驻 注册到Zookeeper
  2. 获取到当前营业的饭店列表
  3. 服务器节点下线
  4. 服务器节点上下线事件通知
  5. 重新再去获取服务器列表,并注册监听

image.png

Zookeeper 特点

无论分布式和集群都是很多人在做事情,具体区别如下:
例如:小明有一个饭店,招聘多个工作人员
分布式:招聘1个厨师,1个服务员,1个前台,三个人负责的工作不一样,但最终的目的是为饭店工作,例如一个淘宝项目:分为订单系统、商品系统、用户系统,三个系统的工作不一样,但是都是作为一个淘宝商城的项目合为一个。
集群:招聘3个服务员,3个人的工作一致。例如淘宝项目,订单系统的订单量太大了,这时候就会给订单系统提供多台服务器,这就是集群
image.png

  1. 一个leader和多个follower来组成的集群
  2. 集群中只要有半数以上的节点存活,Zookeeper就能正常工作(5台服务器挂2台没问题;4台服务器挂2台,就停止),一般会使用奇数台服务器
  3. 全局数据一致性,每台服务器都保存一份相同的数据副本,无论client连接哪台Server,数据都是一致的
  4. 数据更新原子性,一次数据要么成功,要么失败
  5. 实时性,在一定时间范围内,client能读取到最新数据
  6. 更新的请求按照顺序执行,会按照发送过来的顺序,逐一执行

    数据结构

    Zookeeper数据模型的结构与Linux文件系统类似,整体上可以看作是一棵树,每个节点成做一个ZNode(ZookeeperNode)
    每个ZNode默认能够存储1MB的数据(元数据),每个ZNode的路径都是唯一的。
    元数据(Metadata):又称中介数据,中继数据,为描述数据的数据,主要描述数据属性的信息,用来支持指示:存储位置、历史数据、资源查找、文件记录等功能

应用场景

zookeeper提供服务包括:统一命名服务、统一配置管理、统一集群管理,服务器节点动态上下线、软负载均衡等

统一命名服务

在分布式场景下,通常需要对应用或服务进行统一的命名,便于识别
image.png

统一配置管理

在分布式环境下,配置文件做同步是必经之路,例如1000台服务器,如果配置文件作出修改,那一台一台的修改肯定不行,Zookeeper可以做到一处修改快速同步到每台服务器上

  1. 将配置信息写入到Zookeeper的某个节点上
  2. 每个客户端都监听这个节点
  3. 一旦节点中的数据文件被修改,Zookeeper就会通知每台客户端服务器

image.png

服务器节点动态上下线

客户端能够实时获取服务器上下线的变化
image.png

软负载均衡

Zookeeper会记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户请求
image.png

Zookeeper 本地模式安装

Zookeeper镜像库地址:本篇文章使用的是3.6.0的版本
apache-zookeeper-3.6.0-bin.tar.gz bin已经自带所需要的各种jar包
apache-jmeter-5.3.zip
需要在Linux虚拟机中进行安装,首先需要安装jdk

  1. 将Zookeeper包拷贝到opt目录下,解压:tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz 重命名为:zookeeper
  2. 配置修改 ```shell 复制配置文件:zoo_sample.cfg cp zoo_sample.cfg zoo.cfg vi zoo.cfg

修改配置 修改dataDir目录

dataDir=/opt/zookeeper/ZkData dataLogDir=/opt/zookeeper/zkLog

创建目录:ZkData zkLog

mkdir ZkData mkdir zkLog

  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1611640507876-bb6a35bf-1c1b-4ce2-ad2c-6fd5e2a81a34.png#height=185&id=hmEDR&margin=%5Bobject%20Object%5D&name=image.png&originHeight=370&originWidth=874&originalType=binary&ratio=1&size=181410&status=done&style=none&width=437)<br />这样Zookeeper安装就完成了。<br />启动Zookeeper:`zkServer.sh`
  2. `./zkServer.sh start`<br />注意 8080端口释放出来,否则zookeeper启动会出现错误<br />不能和Tomcat部署在同一台机器上,因为zookeepertomcat同样默认使用的都是8080端口;<br />修改端口,如果必须部署到一台服务器上,则选择修改tomcat的端口号,将tomcat8080释放出来;
  3. 停止Zookeeper `./zkServer.sh stop`
  4. 查看Zookeeper的状态:`./zkServer.sh status`可以看到zookeeper的端口是2181
  5. ```shell
  6. [root@localhost bin]# ./zkServer.sh status
  7. ZooKeeper JMX enabled by default
  8. Using config: /opt/zookeeper/bin/../conf/zoo.cfg
  9. Client port found: 2181. Client address: localhost.
  10. Mode: standalone
  11. # 注意如果出现如下错误,检查配置文件中配置的文件是否创建;第二查看8080端口是否
  12. # 被占用,如果被占用记得释放8080端口
  13. Error contacting service. It is probably not running.

启动客户端:./zkCli.sh 退出客户端输入quit 即可

配置参数详解:

Zookeeper中的配置文件zoo.cfg中参数含义解读如下:

tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就
是每个tickTime时间就会发送一个心跳,时间单位为毫秒。 

initLimit =10:LF初始通信时限
集群中的Follower跟随者服务器与Leader领导者服务器之间,启动时能容忍的最多心跳数 10*2000(10个心跳时间)如果领导和跟随者没有发出心跳通信,就视为失效的连接,领导 和跟随者彻底断开

syncLimit =5:LF同步通信时限
集群启动后,Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit *
tickTime->10秒,Leader就认为Follwer已经死掉,会将Follwer从服务器列表中删除 

dataDir:数据文件目录+数据持久化路径
主要用于保存Zookeeper中的数据。 

dataLogDir:日志文件目录

clientPort =2181:客户端连接端口 监听客户端连接的端口。

Zookeeper 内部原理

选举机制(面试常问)

  • 半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器
  • 虽然在配置文件中并没有指定Master和Slave,但在Zookeeper工作时,有一个节点为Leader,其他则为Follower,Leader是通过内部选举机制临时产生的

image.png

  1. Server1 先投票给自己,自己得1票,没有超过半数,根本无法成为Leader,那么Server就会把自己的票数(1票)顺水退舟投给了id比自己大的Server2
  2. Server2也把自己的票数投给自己,再加上Server1给的票数,总票数为2票没有超过半数,无法成为Leader,那么Server2将自己的票数(2票)投给了id比自己大的Server3
  3. Server3得到了Server1和Server2的两票,在加上自己投给自己的一票,3票超过半数,成为leader,既然Server3成为了Leader那么就没有必要把自己的票数投给Server4了
  4. Server4和Server5都投给自己,但是无法改变Server3的票数,只好听天由命,承认Server3是leader

    节点类型

  • 持久型 persustent
    • 持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在
    • 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,创建znode设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护:Znode001,Znode002
  • 短暂型 ephemeral
    • 临时目录节点:客户端与服务器端断开连接后,创建的节点自动删除
    • 临时顺序编号目录节点:客户端与Zookeeper断开连接后,该节点被删除,创建znode设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护:Znode001,Znode002

注意:序号是相当于i++和数据库中的自增长类似

监听器原理(面试常问)

image.png

  1. 在main方法中创建Zookeeper客户端的同时就会创建两个线程,一个负责网络连接通信,一个负责监听
  2. 监听事件就会通过网络通信发送到Zookeeper
  3. Zookeeper获得注册的监听事件后,立刻将监听事件添加到监听列表里
  4. Zookeeper监听到数据变化或路径变化,就会将这个消息发送给监听线程
    1. 监听节点数据的变化:get path [watch]
    2. 监听子节点增减的变化:ls path [watch]
  5. 监听线程就会在内部调用process方法(需要自己实现process方法内容)

写数据流程

image.png

  1. Client想向Zookeeper的Server1上写数据,必须先发送一个写的请求
  2. 如果Server1不是Leader,那么Server1会把接收到的请求进一步转发给leader
  3. 这个leader会将写请求广播给每一个Server,各个Server写成功后就会通知leader
  4. 当leader收到半数以上的Server数据写成功了,那么就说明数据写成功了
  5. 随后,leader会告诉Server1数据写成功了
  6. Server1会反馈通知client数据写成功了,整个流程结束

    Zookeeper 实战

    分布式安装部署

    集群思路:先搞定一台服务器,在克隆出两台,形成集群

  7. 安装Zookeeper 参考上述

  8. 配置服务器编号

/opt/zookeeper/zkData 中创建myid文件
在文件中添加与Server对应的编号:1
其余两台服务器分别对应编号:2和3

vi myid
1

image.png

  1. 配置zoo.cfg文件

增加如下配置:添加对应的虚拟机的IP

#######################cluster##########################
server.1=172.16.150.130:2888:3888
server.2=172.16.150.131:2888:3888
server.3=172.16.150.132:2888:3888
  • 配置参数解读:server.A=B:C:D
    • A 表示第几号服务器,集群模式下的配置/opt/zookeeper/zkData.myid文件里面的数据就是A的值
    • B : 服务器的IP地址
    • C : 与集群中Leader服务器交换信息的端口
    • D :选举专用端口,万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口
  1. 配置其余两台服务器

image.png
mac电脑的可以直接克隆虚拟机:
image.png
如何配置静态IP 看如下文章

克隆的虚拟机重新生成mac地址即可
image.png
修改文件名和ifconfig中的名称一致
image.png
关闭防火墙,就可以使用任意端口访问了
查看防火墙状态

firewall-cmd --state

停止firewall

systemctl stop firewalld.service

禁止firewall开机启动

systemctl disable firewalld.service

注意:克隆虚拟机重新设置静态IP的问题

配置静态IP
使用root用户打开/etc/sysconfig/network-scripts/ifcfg-eno16777736文件,添加内容如下:

BOOTPROTO=static
ONBOOT=yes
IPADDR=192.168.72.128
GATEWAY=192.168.72.2
NETMASK=255.255.255.0
DNS1=114.114.114.114

使用命令使得配置文件生效:service network restart

如果没有发生改变重启一下即可。

  1. 集群操作

启动第一台服务器:
./zkServer.sh start
查看状态
image.png
第一台服务器失败,是因为没有超过半数以上的服务器,所以集群失败了(注意:如果防火墙没有关闭也会导致失败)。

启动第二台服务器,查看状态:
image.png

第二台服务器成为了:leader 为什么呢?因为一共三台服务器,通过选举机制,第二台服务器一票,同时得到了第一台服务器的一票,得到的票数最多,所以就成为了leader。
在查看第一台服务器的状态:
image.png

启动第三台服务器,查看状态:
image.png
这样就搭建好了集群环境:
第一台:Mode:follower
第二台:Mode:leader
第三台:Mode:follower

客户端命令行操作

  • 启动客户端 ./zkCli.sh
  • 显示帮助命令: help
  • ls -s / 查看节点的详情 ``` [zk: localhost:2181(CONNECTED) 2] ls -s / [zookeeper] cZxid = 0x0 : 创建节点的事物,每次修改zookeeper 状态都会收到一个zxid形式的时间戳,也就是Zookeeper事物id,事物id是Zookeeper中所有修改总的次序,每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。

ctime = Wed Dec 31 16:00:00 PST 1969 : 被创建的毫秒数从1970年开始 mZxid = 0x0 : 最后更新事物zxid mtime = Wed Dec 31 16:00:00 PST 1969 : 最后修改的毫秒数 pZxid = 0x0 : 最后更新的子节点zxid cversion = -1 : 创建版本号,子节点修改次数 dataVersion = 0 :数据变化版本号 aclVersion = 0 :权限版本号 ephemeralOwner = 0x0 : 如果是临时节点,这个是zxnode拥有这的sessionid,如果不是临时节点则是0 dataLength = 0 : 数据长度 numChildren = 1 : 子节点数


- 创建两个节点

[zk: localhost:2181(CONNECTED) 3] create /china Created /china [zk: localhost:2181(CONNECTED) 4] create /usa Created /usa [zk: localhost:2181(CONNECTED) 5] ls -s / [china, usa, zookeeper] cZxid = 0x0 ctime = Wed Dec 31 16:00:00 PST 1969 mZxid = 0x0 mtime = Wed Dec 31 16:00:00 PST 1969 pZxid = 0x100000003 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 3

创建节点,并保存数据到节点上

create /ru “pujing”

获取节点数据 [zk: localhost:2181(CONNECTED) 9] get /ru pujing [zk: localhost:2181(CONNECTED) 10] get /usa null


- 多级创建节点

[zk: localhost:2181(CONNECTED) 12] create /japan Created /japan [zk: localhost:2181(CONNECTED) 13] create /japan/Tokyo “hot” Created /japan/Tokyo [zk: localhost:2181(CONNECTED) 14] get /japan/Tokyo hot


- 临时节点,创建成功之后,退出客户端,在重新连接,短暂的节点消失

create -e /uk [zk: localhost:2181(CONNECTED) 16] ls / [china, japan, ru, uk, usa, zookeeper]

执行`quit` 再次查看节点信息:uk已经没有了

[zk: localhost:2181(CONNECTED) 0] ls / [china, japan, ru, usa, zookeeper]


- 顺序节点(如果加了-e 是临时顺序节点,默认是持久节点)

[zk: localhost:2181(CONNECTED) 1] create -s /ru/city Created /ru/city0000000000 [zk: localhost:2181(CONNECTED) 2] create -s /ru/city Created /ru/city0000000001 [zk: localhost:2181(CONNECTED) 3] create -s /ru/city Created /ru/city0000000002 [zk: localhost:2181(CONNECTED) 4] ls /ru [city0000000000, city0000000001, city0000000002]


- 修改节点的数值

[china, japan, ru, usa, zookeeper] [zk: localhost:2181(CONNECTED) 6] get /japan/Tokyo hot [zk: localhost:2181(CONNECTED) 7] set /japan/Tokyo “too hot” [zk: localhost:2181(CONNECTED) 8] get /japan/Tokyo too hot


- 监听节点的值变化或子节点变化(路径变化)
1. 在Server3主机上注册监听`/usa`节点数据变化

[zk: localhost:2181(CONNECTED) 0] addWatch /usa


2. 在Server1主机上修改`/usa`的数值

[zk: localhost:2181(CONNECTED) 9] set /usa “usa”


3. Server3 会立即响应

[zk: localhost:2181(CONNECTED) 1] WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/usa


4. 在Server1主机上创建`/usa/NewYork` 子节点

[zk: localhost:2181(CONNECTED) 10] create /usa/NewYork Created /usa/NewYork


5. 在Server3主机得到的响应

WATCHER::

WatchedEvent state:SyncConnected type:NodeCreated path:/usa/NewYork


- 删除节点

[zk: localhost:2181(CONNECTED) 13] delete /usa/hua [zk: localhost:2181(CONNECTED) 14] ls /usa [NewYork] 删除的节点有子节点 不能使用delete删除 [zk: localhost:2181(CONNECTED) 15] delete /ru Node not empty: /ru

删除节点的所有内容包括子节点 [zk: localhost:2181(CONNECTED) 17] deleteall /usa [zk: localhost:2181(CONNECTED) 18] ls / [china, japan, ru, zookeeper]

<a name="Z8PnY"></a>
### API 应用
通过Java来操作Zookeeper。创建maven工程,引入依赖
```xml
    <dependencies>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <!-- zookeeper的版本必须和服务器的版本一致 -->
            <version>3.6.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

log4j的配置:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/zk.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

创建Zookeeper客户端:

public class TestZk {
    //Zookeeper的端口和IP集群
    private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";

    /**
     * session的时间设置,默认是ms:时间不宜设置太小,因为Zookeeper和加载集群会因为性能等原因而延迟较高
     * 如果时间太少,还没有创建好客户端,会报错
     */
    private int sessionTimout = 60 * 1000;

    private ZooKeeper zlClient;

    @Test
    public void init() throws IOException {
        //创建Zookeeper客户端,操作Zookeeper
        zlClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("得到监听反馈,进行业务代码处理。");
            }
        });
    }

}

运行,看下打印日志:打印出如下信息就没有问题
image.png

创建节点

    /**
     * 创建节点
     */
    @Test
    public void createNode() throws KeeperException, InterruptedException {
        /**
         * 参数1:创建节点的路径
         * 参数2:节点数据
         * 参数3:节点权限
         *  ACL对象:一个id和permission对 表示在哪些/哪个范围的id(Who)在通过了怎样的鉴权(How)之后,允许进行哪些操作(What)
         *  permission(what)一个int表示的位码,每一位代表一个对应操作的允许状态
         *  OPEN_ACL_UNSAFE: 创建开放节点,允许任意操作 - 使用最多
         *  READ_ACL_UNSAFE: 创建只读节点
         *  CREATOR_ALL_ACL: 创建者才有全部权限
         * 参数4:节点的类型:持久型、临时型节点
         */
        String str = zkClient.create("/prim", "lao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(str);
    }

/prim节点就创建完毕了
image.png
我们去服务器中去查看节点:我么可以从部署的三台Zookeeper服务器中都可以看到该节点,这也就证实了之前所讲到的Zookeeper的特性 :::tips 数据一致性和原子性 :::

[zk: localhost:2181(CONNECTED) 1] ls /
[china, japan, prim, ru, usa, zookeeper]
[zk: localhost:2181(CONNECTED) 2] get /prim
lao

API 查询节点的数据

    /**
     * 获取节点
     */
    @Test
    public void getNode() throws KeeperException, InterruptedException {
        byte[] data = zkClient.getData("/prim", false, new Stat());
        String str = new String(data);
        System.out.println("get=" + str);
    }

更新节点数据

查询/prim的dataVersion,数据修改的版本,我们需要根据这个版本号来进行更新操作
image.png

    @Test
    public void updateNode() throws KeeperException, InterruptedException {
        //version是版本 通过ls -s /prim可以查看更新的版本
        Stat stat = zkClient.setData("/prim", "laoA".getBytes(), 0);
    }

更新之后,我们再去看一下该节点的数据和dataVersion是否发生改变:
image.png
下次在更新数据,就需要传递版本为1了,如果继续为0就会报错:
image.png
使用版本1进行更新数据

    @Test
    public void updateNode() throws KeeperException, InterruptedException {
        //version是版本 通过ls -s /prim可以查看更新的版本
        Stat stat = zkClient.setData("/prim", "laoB".getBytes(), 1);
    }
[zk: localhost:2181(CONNECTED) 7] get /prim
laoB

删除节点

:::tips 注意:删除节点,也需要传递dataVersion版本号 ::: image.png

    @Test
    public void deleteNode() throws KeeperException, InterruptedException {
        //由于我们进行了两次更新操作,所以数据版本号为2了
        zkClient.delete("/prim", 2);
    }

查询所有节点:已经没有了prim节点了
image.png

获取所有子节点

   /**
     * 获取子节点
     */
    @Test
    public void subNode() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/china", false);
        System.out.println(children);
    }

输出:[beijing", shanghai, guangzhou]

节点监听和判断

        //创建Zookeeper客户端,操作Zookeeper
        zkClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("得到监听反馈,进行业务代码处理。");
                System.out.println(watchedEvent.getType());
            }
        });   
/**
     * 监听根节点,下面的变化
     */
    @Test
    public void watchRootNode() throws KeeperException, InterruptedException, IOException {
        List<String> children = zkClient.getChildren("/", true);
        System.out.println(children);
        System.in.read();
    }

    @Test
    public void existsNode() throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists("/prim", false);
        if (exists == null){
            System.out.println("节点不存在");
        }else {
            System.out.println("节点存在");
        }
    }

模拟美团商家上下线

美团商家服务类:


/**
 * 商家服务类
 */
public class ShopServer {
    //Zookeeper的端口和IP集群
    private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";

    /**
     * session的时间设置,默认是ms:时间不宜设置太小,因为Zookeeper和加载集群会因为性能等原因而延迟较高
     * 如果时间太少,还没有创建好客户端,会报错
     */
    private int sessionTimout = 60 * 1000;

    private ZooKeeper zkClient;

    /**
     * 连接Zookeeper
     */
    public void connection() throws IOException {
        zkClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    /**
     * 注册到Zookeeper
     */
    public void register(String shopName) throws KeeperException, InterruptedException {
        //在根节点提前创建meituan 一定要创建临时有序的节点,因为:
        //1. 可以自动编号;2. 断开时节点自动删除 也就意味着商家打样了;3. 创建节点就是营业
        String s = zkClient.create("/meituan/shop", shopName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("[" + shopName + "]开始营业了" + s);
    }

    /**
     * 业务逻辑 - 做生意
     *
     * @param arg
     */
    private void business(String arg) throws IOException {
        System.out.println("【" + arg + "】正在营业中");
        System.in.read();
    }

    public static void main(String[] args) throws Exception {
        // 我要开一个饭店
        ShopServer shopServer = new ShopServer();

        // 连接Zookeeper集群 和 美团取得联系
        shopServer.connection();

        // 将服务节点注册到Zookeeper(入驻美团)
        shopServer.register(args[0]);

        // 业务逻辑处理- 做生意
        shopServer.business(args[0]);
    }


}

美团用户类:在监听中,每次收到监听重新获取商家列表

public class Customers {
    public static void main(String[] args) throws Exception {
        //1. 获得Zookeeper的链接 用户打开美团APP
        Customers customers = new Customers();
        customers.connection();
        //2. 获取meituan下的所有子节点列表 - 获取商家列表
        customers.shopList();

        //3. 业务处理 - 对比商家下单点餐
        customers.business();
    }

    private void business() throws IOException {
        System.out.println("用户正在浏览商家");
        System.in.read();
    }

    private void shopList() throws KeeperException, InterruptedException {
        //对父节点进行监听
        List<String> shops = zkClient.getChildren("/meituan", true);
        //声明存储服务器信息的集合
        List<String> shopList = new ArrayList<>();
        for (String shop : shops) {
            //获取每一个节点的数据
            byte[] data = zkClient.getData("/meituan/" + shop, false, new Stat());
            shopList.add(new String(data));
        }
        System.out.println("目前正在营业的商店列表:" + shopList);
    }

    //Zookeeper的端口和IP集群
    private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";

    /**
     * session的时间设置,默认是ms:时间不宜设置太小,因为Zookeeper和加载集群会因为性能等原因而延迟较高
     * 如果时间太少,还没有创建好客户端,会报错
     */
    private int sessionTimout = 60 * 1000;

    private ZooKeeper zkClient;

    /**
     * 连接Zookeeper
     */
    public void connection() throws IOException {
       zkClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //重新再次获取商家列表
                try {
                    shopList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

当我们运行shop创建节点:用户端就会收到监听
image.png
当我们断开商家连接一次断开,由于我们创建的临时有序的节点,所以它会按照顺序进行断开,用户端收到下线通知:
注意:这里会有一定的延迟

目前正在营业的商店列表:[jiaozi, KFC, baozi, baozi2]
目前正在营业的商店列表:[KFC, baozi, baozi2]
目前正在营业的商店列表:[baozi, baozi2]

案例-分布式锁-商品秒杀

锁:在多线程中接触过,作用就是让当前的资源不会被其他线程访问。 在Zookeeper中使用传统的锁会引发“羊群效应”:1000个人创建节点,只有一个人能成功,999人需要等待。 羊群是一种很散乱的组织,平时在一起也是盲目的左冲右撞,一旦有一头羊动起来,其他的羊也会不假思索的一哄而上,全然不顾旁边可能有狼。

image.png
为了避免上述的“羊群效应”没有组织、没有纪律,Zookeeper采用分布式锁,有组织、有纪律

  1. 所有请求进来,在/lock下创建临时顺序节点,Zookeeper会帮助编号排序
  2. 判断自己是不是/lock下最小的节点
    1. 是,获得锁 - 创建节点
    2. 否,对前面小我一级的节点进行监听
  3. 获得锁请求,处理完业务逻辑,释放锁(删除节点),后一个节点得到通知(比你年轻的死了,你成为了最嫩的了)
  4. 重复步骤2

image.png

1. 初始化数据库

在ip为131的一个linux虚拟机中创建数据库:zkproduct

-- 商品表
create table product(
id int primary key auto_increment, -- 商品编号 
product_name varchar(20) not null, -- 商品名称 
stock int not null, -- 库存
version int not null -- 版本
)
insert into product (product_name,stock,version) values('锦鲤-清空购物车-大奖',5,0)
-- 订单表
create table `order`(
id varchar(100) primary key, -- 订单编号 pid int not null, -- 商品编号
userid int not null -- 用户编号
)

2.搭建SSM工程

不再写配置代码了,直接看业务层的代码:


@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private ProductMapper productMapper;

    @Autowired
    private OrderMapper orderMapper;

    @Transactional
    @Override
    public void reduceStock(Integer id) {
        //获取库存 查询商品查询到商品则减少库存
        Product product = productMapper.getProductById(id);
        //商品存在,并且库存大于0
        if (product != null && product.getStock() > 0) {
            int i = productMapper.reduceStock(id);
            if (i == 1) {
                //减库存成功生成订单
                Order order = new Order(UUID.randomUUID().toString(), id, 1001);
                orderMapper.insertOrder(order);
            } else {
                //减库存失败
                throw new RuntimeException("减库存失败");
            }
        } else {
            throw new RuntimeException("商品已经抢光");
        }

    }
}

3. Nginx+jmeter 并发测试

启动两次工程(本机):8001 8002
如下两个端口需要修改:
image.png
注意一定要选择exploded并且application context:/
image.png
这样我们就启用了两个Tomcat。

使用nginx做负载均衡(在虚拟机),nginx配置:
注意在配置负载均衡的upstream是本机的IP地址,server_name 设置的是域名这里其实可以写localhost,为了区分,使用域名的形式,在host文件中进行修改

upstream sga{
    server 172.16.150.1:8001;
    server 172.16.150.1:8002;
}
server {
    listen       80;
    server_name  www.zk.com;
    #charset koi8-r;
    #access_log  logs/host.access.log  main;
    location / {
        proxy_pass http://sga;
        root   html;
        index  index.html index.htm;
}

host 添加:172.16.150.130 是虚拟机的IP地址
172.16.150.130 www.zk.com 其实本质上就是localhost

:wq 保存配置启动Nginx, 如果你还不太熟练Nginx看这篇文章。
Nginx 快速入门
通过Jmeter进行并发测试,首先安装jmeter,直接解压即可。apache-jmeter-5.3.zip
然后配置环境变量(mac)

# jmeter配置
export JMETER_HOME=/Users/prim/java/apache-jmeter-5.3
export PATH=$JAVA_HOME/bin:$PATH:.:$JMETER_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JMETER_HOME/lib/ext/ApacheJMeter_core.jar:$JMETER_HOME/lib/jorphan.jar:$JMETER_HOME/lib/logkit-2.0.jar

在命令行输入:jmeter 即可打开GUI工具

  1. 添加线程组:

image.png

  1. 配置线程组

image.png

  1. 配置请求

image.png
image.png

  1. 监听请求的结果

image.png

  1. 执行的结果,有6个成功了,4个失败了

image.png
在看一下数据库:库存变成了-1
image.png
订单表有六条数据:
image.png

使用Zookeeper

基于Zookeeper原生态的客户端类实现分布式非常麻烦的,使用Apache提供了Curator 封装的Zookeeper客户端来实现。

       <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version> <!-- 网友投票最牛逼版本 -->
        </dependency>

recipes是curator族谱大全,里面包含zookeeper和framework

Curator 实现分布式锁:

@RestController
@RequestMapping("/product")
public class ProductController {

    @Autowired
    private OrderService orderService;

    //Zookeeper集群
    private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";


    /**
     * 减库存操作
     *
     * @param id
     * @return
     */
    @GetMapping("/reduce")
    public Object reduce(Integer id) throws Exception {
        //重试策略,1000ms 重试1次 最多试3次
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        //创建curator 工具对象
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, retry);
        client.start();
        //根据客户端工具对象 创建锁"内部互斥锁" "/product_" + id 每个商品创建一个节点,例如商品id为1 创建的节点就是/product_1
        InterProcessMutex lock = new InterProcessMutex(client, "/product_" + id);
        try {
            lock.acquire();
            //加锁
            orderService.reduceStock(id);
        } finally {
            //释放锁
            lock.release();
        }
        return "ok";
    }
}

重置数据库,在用jmeter跑一下:成功5个,失败了5个
image.png
我们在看一下,数据库:5个商品库存为0,分为5个用户。
image.png
image.png