Zookeeper 简介
zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。 在大数据技术生态圈中:Zookeeper-动物管理员、Hadoop-大象、Hive-蜜蜂、Pig-猪等技术。
zookeeper 是一个基于观察者模式(一个人干活,有人盯着他)设计的分布式服务管理框架
Zookeeper负责 存储 和管理 数据
- 接受观察者的注册,一旦这些数据发生变化
- Zookeeper就将负责通知已经注册的那些观察者做出相应的反应
- 从而实现集群中类似Master/Slave管理模式
Zookeeper的工作机制:
Zookeeper = 文件系统 + 监听通知机制
- 商家营业入驻 注册到Zookeeper
- 获取到当前营业的饭店列表
- 服务器节点下线
- 服务器节点上下线事件通知
- 重新再去获取服务器列表,并注册监听
Zookeeper 特点
无论分布式和集群都是很多人在做事情,具体区别如下:
例如:小明有一个饭店,招聘多个工作人员
分布式:招聘1个厨师,1个服务员,1个前台,三个人负责的工作不一样,但最终的目的是为饭店工作,例如一个淘宝项目:分为订单系统、商品系统、用户系统,三个系统的工作不一样,但是都是作为一个淘宝商城的项目合为一个。
集群:招聘3个服务员,3个人的工作一致。例如淘宝项目,订单系统的订单量太大了,这时候就会给订单系统提供多台服务器,这就是集群
- 是一个leader和多个follower来组成的集群
- 集群中只要有半数以上的节点存活,Zookeeper就能正常工作(5台服务器挂2台没问题;4台服务器挂2台,就停止),一般会使用奇数台服务器
- 全局数据一致性,每台服务器都保存一份相同的数据副本,无论client连接哪台Server,数据都是一致的
- 数据更新原子性,一次数据要么成功,要么失败
- 实时性,在一定时间范围内,client能读取到最新数据
- 更新的请求按照顺序执行,会按照发送过来的顺序,逐一执行
数据结构
Zookeeper数据模型的结构与Linux文件系统类似,整体上可以看作是一棵树,每个节点成做一个ZNode(ZookeeperNode)
每个ZNode默认能够存储1MB的数据(元数据),每个ZNode的路径都是唯一的。
元数据(Metadata):又称中介数据,中继数据,为描述数据的数据,主要描述数据属性的信息,用来支持指示:存储位置、历史数据、资源查找、文件记录等功能
应用场景
zookeeper提供服务包括:统一命名服务、统一配置管理、统一集群管理,服务器节点动态上下线、软负载均衡等
统一命名服务
在分布式场景下,通常需要对应用或服务进行统一的命名,便于识别
统一配置管理
在分布式环境下,配置文件做同步是必经之路,例如1000台服务器,如果配置文件作出修改,那一台一台的修改肯定不行,Zookeeper可以做到一处修改快速同步到每台服务器上
- 将配置信息写入到Zookeeper的某个节点上
- 每个客户端都监听这个节点
- 一旦节点中的数据文件被修改,Zookeeper就会通知每台客户端服务器
服务器节点动态上下线
软负载均衡
Zookeeper会记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户请求
Zookeeper 本地模式安装
Zookeeper镜像库地址:本篇文章使用的是3.6.0的版本
apache-zookeeper-3.6.0-bin.tar.gz bin已经自带所需要的各种jar包
apache-jmeter-5.3.zip
需要在Linux虚拟机中进行安装,首先需要安装jdk
- 将Zookeeper包拷贝到opt目录下,解压:
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
重命名为:zookeeper
- 配置修改 ```shell 复制配置文件:zoo_sample.cfg cp zoo_sample.cfg zoo.cfg vi zoo.cfg
修改配置 修改dataDir目录
dataDir=/opt/zookeeper/ZkData dataLogDir=/opt/zookeeper/zkLog
创建目录:ZkData zkLog
mkdir ZkData mkdir zkLog
![image.png](https://cdn.nlark.com/yuque/0/2021/png/375694/1611640507876-bb6a35bf-1c1b-4ce2-ad2c-6fd5e2a81a34.png#height=185&id=hmEDR&margin=%5Bobject%20Object%5D&name=image.png&originHeight=370&originWidth=874&originalType=binary&ratio=1&size=181410&status=done&style=none&width=437)<br />这样Zookeeper安装就完成了。<br />启动Zookeeper:`zkServer.sh`
`./zkServer.sh start`<br />注意 将8080端口释放出来,否则zookeeper启动会出现错误<br />不能和Tomcat部署在同一台机器上,因为zookeeper和tomcat同样默认使用的都是8080端口;<br />修改端口,如果必须部署到一台服务器上,则选择修改tomcat的端口号,将tomcat的8080释放出来;
停止Zookeeper `./zkServer.sh stop`
查看Zookeeper的状态:`./zkServer.sh status`可以看到zookeeper的端口是2181
```shell
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone
# 注意如果出现如下错误,检查配置文件中配置的文件是否创建;第二查看8080端口是否
# 被占用,如果被占用记得释放8080端口
Error contacting service. It is probably not running.
启动客户端:./zkCli.sh
退出客户端输入quit
即可
配置参数详解:
Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就
是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
initLimit =10:LF初始通信时限
集群中的Follower跟随者服务器与Leader领导者服务器之间,启动时能容忍的最多心跳数 10*2000(10个心跳时间)如果领导和跟随者没有发出心跳通信,就视为失效的连接,领导 和跟随者彻底断开
syncLimit =5:LF同步通信时限
集群启动后,Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit *
tickTime->10秒,Leader就认为Follwer已经死掉,会将Follwer从服务器列表中删除
dataDir:数据文件目录+数据持久化路径
主要用于保存Zookeeper中的数据。
dataLogDir:日志文件目录
clientPort =2181:客户端连接端口 监听客户端连接的端口。
Zookeeper 内部原理
选举机制(面试常问)
- 半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器
- 虽然在配置文件中并没有指定Master和Slave,但在Zookeeper工作时,有一个节点为Leader,其他则为Follower,Leader是通过内部选举机制临时产生的
- Server1 先投票给自己,自己得1票,没有超过半数,根本无法成为Leader,那么Server就会把自己的票数(1票)顺水退舟投给了id比自己大的Server2
- Server2也把自己的票数投给自己,再加上Server1给的票数,总票数为2票没有超过半数,无法成为Leader,那么Server2将自己的票数(2票)投给了id比自己大的Server3
- Server3得到了Server1和Server2的两票,在加上自己投给自己的一票,3票超过半数,成为leader,既然Server3成为了Leader那么就没有必要把自己的票数投给Server4了
- Server4和Server5都投给自己,但是无法改变Server3的票数,只好听天由命,承认Server3是leader
节点类型
- 持久型 persustent
- 持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在
- 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,创建znode设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护:Znode001,Znode002
- 短暂型 ephemeral
- 临时目录节点:客户端与服务器端断开连接后,创建的节点自动删除
- 临时顺序编号目录节点:客户端与Zookeeper断开连接后,该节点被删除,创建znode设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护:Znode001,Znode002
注意:序号是相当于i++和数据库中的自增长类似
监听器原理(面试常问)
- 在main方法中创建Zookeeper客户端的同时就会创建两个线程,一个负责网络连接通信,一个负责监听
- 监听事件就会通过网络通信发送到Zookeeper
- Zookeeper获得注册的监听事件后,立刻将监听事件添加到监听列表里
- Zookeeper监听到数据变化或路径变化,就会将这个消息发送给监听线程
- 监听节点数据的变化:get path [watch]
- 监听子节点增减的变化:ls path [watch]
- 监听线程就会在内部调用process方法(需要自己实现process方法内容)
写数据流程
- Client想向Zookeeper的Server1上写数据,必须先发送一个写的请求
- 如果Server1不是Leader,那么Server1会把接收到的请求进一步转发给leader
- 这个leader会将写请求广播给每一个Server,各个Server写成功后就会通知leader
- 当leader收到半数以上的Server数据写成功了,那么就说明数据写成功了
- 随后,leader会告诉Server1数据写成功了
Server1会反馈通知client数据写成功了,整个流程结束
Zookeeper 实战
分布式安装部署
集群思路:先搞定一台服务器,在克隆出两台,形成集群
安装Zookeeper 参考上述
- 配置服务器编号
在/opt/zookeeper/zkData
中创建myid文件
在文件中添加与Server对应的编号:1
其余两台服务器分别对应编号:2和3
vi myid
1
- 配置zoo.cfg文件
增加如下配置:添加对应的虚拟机的IP
#######################cluster##########################
server.1=172.16.150.130:2888:3888
server.2=172.16.150.131:2888:3888
server.3=172.16.150.132:2888:3888
- 配置参数解读:server.A=B:C:D
- A 表示第几号服务器,集群模式下的配置/opt/zookeeper/zkData.myid文件里面的数据就是A的值
- B : 服务器的IP地址
- C : 与集群中Leader服务器交换信息的端口
- D :选举专用端口,万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口
- 配置其余两台服务器
mac电脑的可以直接克隆虚拟机:
如何配置静态IP 看如下文章
克隆的虚拟机重新生成mac地址即可
修改文件名和ifconfig中的名称一致
关闭防火墙,就可以使用任意端口访问了
查看防火墙状态
firewall-cmd --state
停止firewall
systemctl stop firewalld.service
禁止firewall开机启动
systemctl disable firewalld.service
注意:克隆虚拟机重新设置静态IP的问题
配置静态IP
使用root用户打开/etc/sysconfig/network-scripts/ifcfg-eno16777736文件,添加内容如下:
BOOTPROTO=static
ONBOOT=yes
IPADDR=192.168.72.128
GATEWAY=192.168.72.2
NETMASK=255.255.255.0
DNS1=114.114.114.114
使用命令使得配置文件生效:service network restart
如果没有发生改变重启一下即可。
- 集群操作
启动第一台服务器:./zkServer.sh start
查看状态
第一台服务器失败,是因为没有超过半数以上的服务器,所以集群失败了(注意:如果防火墙没有关闭也会导致失败)。
启动第二台服务器,查看状态:
第二台服务器成为了:leader
为什么呢?因为一共三台服务器,通过选举机制,第二台服务器一票,同时得到了第一台服务器的一票,得到的票数最多,所以就成为了leader。
在查看第一台服务器的状态:
启动第三台服务器,查看状态:
这样就搭建好了集群环境:
第一台:Mode:follower
第二台:Mode:leader
第三台:Mode:follower
客户端命令行操作
- 启动客户端
./zkCli.sh
- 显示帮助命令:
help
ls -s /
查看节点的详情 ``` [zk: localhost:2181(CONNECTED) 2] ls -s / [zookeeper] cZxid = 0x0 : 创建节点的事物,每次修改zookeeper 状态都会收到一个zxid形式的时间戳,也就是Zookeeper事物id,事物id是Zookeeper中所有修改总的次序,每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
ctime = Wed Dec 31 16:00:00 PST 1969 : 被创建的毫秒数从1970年开始 mZxid = 0x0 : 最后更新事物zxid mtime = Wed Dec 31 16:00:00 PST 1969 : 最后修改的毫秒数 pZxid = 0x0 : 最后更新的子节点zxid cversion = -1 : 创建版本号,子节点修改次数 dataVersion = 0 :数据变化版本号 aclVersion = 0 :权限版本号 ephemeralOwner = 0x0 : 如果是临时节点,这个是zxnode拥有这的sessionid,如果不是临时节点则是0 dataLength = 0 : 数据长度 numChildren = 1 : 子节点数
- 创建两个节点
[zk: localhost:2181(CONNECTED) 3] create /china Created /china [zk: localhost:2181(CONNECTED) 4] create /usa Created /usa [zk: localhost:2181(CONNECTED) 5] ls -s / [china, usa, zookeeper] cZxid = 0x0 ctime = Wed Dec 31 16:00:00 PST 1969 mZxid = 0x0 mtime = Wed Dec 31 16:00:00 PST 1969 pZxid = 0x100000003 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 3
创建节点,并保存数据到节点上
create /ru “pujing”
获取节点数据 [zk: localhost:2181(CONNECTED) 9] get /ru pujing [zk: localhost:2181(CONNECTED) 10] get /usa null
- 多级创建节点
[zk: localhost:2181(CONNECTED) 12] create /japan Created /japan [zk: localhost:2181(CONNECTED) 13] create /japan/Tokyo “hot” Created /japan/Tokyo [zk: localhost:2181(CONNECTED) 14] get /japan/Tokyo hot
- 临时节点,创建成功之后,退出客户端,在重新连接,短暂的节点消失
create -e /uk [zk: localhost:2181(CONNECTED) 16] ls / [china, japan, ru, uk, usa, zookeeper]
执行`quit` 再次查看节点信息:uk已经没有了
[zk: localhost:2181(CONNECTED) 0] ls / [china, japan, ru, usa, zookeeper]
- 顺序节点(如果加了-e 是临时顺序节点,默认是持久节点)
[zk: localhost:2181(CONNECTED) 1] create -s /ru/city Created /ru/city0000000000 [zk: localhost:2181(CONNECTED) 2] create -s /ru/city Created /ru/city0000000001 [zk: localhost:2181(CONNECTED) 3] create -s /ru/city Created /ru/city0000000002 [zk: localhost:2181(CONNECTED) 4] ls /ru [city0000000000, city0000000001, city0000000002]
- 修改节点的数值
[china, japan, ru, usa, zookeeper] [zk: localhost:2181(CONNECTED) 6] get /japan/Tokyo hot [zk: localhost:2181(CONNECTED) 7] set /japan/Tokyo “too hot” [zk: localhost:2181(CONNECTED) 8] get /japan/Tokyo too hot
- 监听节点的值变化或子节点变化(路径变化)
1. 在Server3主机上注册监听`/usa`节点数据变化
[zk: localhost:2181(CONNECTED) 0] addWatch /usa
2. 在Server1主机上修改`/usa`的数值
[zk: localhost:2181(CONNECTED) 9] set /usa “usa”
3. Server3 会立即响应
[zk: localhost:2181(CONNECTED) 1] WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/usa
4. 在Server1主机上创建`/usa/NewYork` 子节点
[zk: localhost:2181(CONNECTED) 10] create /usa/NewYork Created /usa/NewYork
5. 在Server3主机得到的响应
WATCHER::
WatchedEvent state:SyncConnected type:NodeCreated path:/usa/NewYork
- 删除节点
[zk: localhost:2181(CONNECTED) 13] delete /usa/hua [zk: localhost:2181(CONNECTED) 14] ls /usa [NewYork] 删除的节点有子节点 不能使用delete删除 [zk: localhost:2181(CONNECTED) 15] delete /ru Node not empty: /ru
删除节点的所有内容包括子节点 [zk: localhost:2181(CONNECTED) 17] deleteall /usa [zk: localhost:2181(CONNECTED) 18] ls / [china, japan, ru, zookeeper]
<a name="Z8PnY"></a>
### API 应用
通过Java来操作Zookeeper。创建maven工程,引入依赖
```xml
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!-- zookeeper的版本必须和服务器的版本一致 -->
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
log4j的配置:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/zk.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
创建Zookeeper客户端:
public class TestZk {
//Zookeeper的端口和IP集群
private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";
/**
* session的时间设置,默认是ms:时间不宜设置太小,因为Zookeeper和加载集群会因为性能等原因而延迟较高
* 如果时间太少,还没有创建好客户端,会报错
*/
private int sessionTimout = 60 * 1000;
private ZooKeeper zlClient;
@Test
public void init() throws IOException {
//创建Zookeeper客户端,操作Zookeeper
zlClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("得到监听反馈,进行业务代码处理。");
}
});
}
}
创建节点
/**
* 创建节点
*/
@Test
public void createNode() throws KeeperException, InterruptedException {
/**
* 参数1:创建节点的路径
* 参数2:节点数据
* 参数3:节点权限
* ACL对象:一个id和permission对 表示在哪些/哪个范围的id(Who)在通过了怎样的鉴权(How)之后,允许进行哪些操作(What)
* permission(what)一个int表示的位码,每一位代表一个对应操作的允许状态
* OPEN_ACL_UNSAFE: 创建开放节点,允许任意操作 - 使用最多
* READ_ACL_UNSAFE: 创建只读节点
* CREATOR_ALL_ACL: 创建者才有全部权限
* 参数4:节点的类型:持久型、临时型节点
*/
String str = zkClient.create("/prim", "lao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(str);
}
/prim节点就创建完毕了
我们去服务器中去查看节点:我么可以从部署的三台Zookeeper服务器中都可以看到该节点,这也就证实了之前所讲到的Zookeeper的特性
:::tips
数据一致性和原子性
:::
[zk: localhost:2181(CONNECTED) 1] ls /
[china, japan, prim, ru, usa, zookeeper]
[zk: localhost:2181(CONNECTED) 2] get /prim
lao
API 查询节点的数据
/**
* 获取节点
*/
@Test
public void getNode() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData("/prim", false, new Stat());
String str = new String(data);
System.out.println("get=" + str);
}
更新节点数据
查询/prim的dataVersion,数据修改的版本,我们需要根据这个版本号来进行更新操作
@Test
public void updateNode() throws KeeperException, InterruptedException {
//version是版本 通过ls -s /prim可以查看更新的版本
Stat stat = zkClient.setData("/prim", "laoA".getBytes(), 0);
}
更新之后,我们再去看一下该节点的数据和dataVersion是否发生改变:
下次在更新数据,就需要传递版本为1了,如果继续为0就会报错:
使用版本1进行更新数据
@Test
public void updateNode() throws KeeperException, InterruptedException {
//version是版本 通过ls -s /prim可以查看更新的版本
Stat stat = zkClient.setData("/prim", "laoB".getBytes(), 1);
}
[zk: localhost:2181(CONNECTED) 7] get /prim
laoB
删除节点
:::tips 注意:删除节点,也需要传递dataVersion版本号 :::
@Test
public void deleteNode() throws KeeperException, InterruptedException {
//由于我们进行了两次更新操作,所以数据版本号为2了
zkClient.delete("/prim", 2);
}
获取所有子节点
/**
* 获取子节点
*/
@Test
public void subNode() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/china", false);
System.out.println(children);
}
输出:[beijing", shanghai, guangzhou]
节点监听和判断
//创建Zookeeper客户端,操作Zookeeper
zkClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("得到监听反馈,进行业务代码处理。");
System.out.println(watchedEvent.getType());
}
});
/**
* 监听根节点,下面的变化
*/
@Test
public void watchRootNode() throws KeeperException, InterruptedException, IOException {
List<String> children = zkClient.getChildren("/", true);
System.out.println(children);
System.in.read();
}
@Test
public void existsNode() throws KeeperException, InterruptedException {
Stat exists = zkClient.exists("/prim", false);
if (exists == null){
System.out.println("节点不存在");
}else {
System.out.println("节点存在");
}
}
模拟美团商家上下线
美团商家服务类:
/**
* 商家服务类
*/
public class ShopServer {
//Zookeeper的端口和IP集群
private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";
/**
* session的时间设置,默认是ms:时间不宜设置太小,因为Zookeeper和加载集群会因为性能等原因而延迟较高
* 如果时间太少,还没有创建好客户端,会报错
*/
private int sessionTimout = 60 * 1000;
private ZooKeeper zkClient;
/**
* 连接Zookeeper
*/
public void connection() throws IOException {
zkClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 注册到Zookeeper
*/
public void register(String shopName) throws KeeperException, InterruptedException {
//在根节点提前创建meituan 一定要创建临时有序的节点,因为:
//1. 可以自动编号;2. 断开时节点自动删除 也就意味着商家打样了;3. 创建节点就是营业
String s = zkClient.create("/meituan/shop", shopName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("[" + shopName + "]开始营业了" + s);
}
/**
* 业务逻辑 - 做生意
*
* @param arg
*/
private void business(String arg) throws IOException {
System.out.println("【" + arg + "】正在营业中");
System.in.read();
}
public static void main(String[] args) throws Exception {
// 我要开一个饭店
ShopServer shopServer = new ShopServer();
// 连接Zookeeper集群 和 美团取得联系
shopServer.connection();
// 将服务节点注册到Zookeeper(入驻美团)
shopServer.register(args[0]);
// 业务逻辑处理- 做生意
shopServer.business(args[0]);
}
}
美团用户类:在监听中,每次收到监听重新获取商家列表
public class Customers {
public static void main(String[] args) throws Exception {
//1. 获得Zookeeper的链接 用户打开美团APP
Customers customers = new Customers();
customers.connection();
//2. 获取meituan下的所有子节点列表 - 获取商家列表
customers.shopList();
//3. 业务处理 - 对比商家下单点餐
customers.business();
}
private void business() throws IOException {
System.out.println("用户正在浏览商家");
System.in.read();
}
private void shopList() throws KeeperException, InterruptedException {
//对父节点进行监听
List<String> shops = zkClient.getChildren("/meituan", true);
//声明存储服务器信息的集合
List<String> shopList = new ArrayList<>();
for (String shop : shops) {
//获取每一个节点的数据
byte[] data = zkClient.getData("/meituan/" + shop, false, new Stat());
shopList.add(new String(data));
}
System.out.println("目前正在营业的商店列表:" + shopList);
}
//Zookeeper的端口和IP集群
private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";
/**
* session的时间设置,默认是ms:时间不宜设置太小,因为Zookeeper和加载集群会因为性能等原因而延迟较高
* 如果时间太少,还没有创建好客户端,会报错
*/
private int sessionTimout = 60 * 1000;
private ZooKeeper zkClient;
/**
* 连接Zookeeper
*/
public void connection() throws IOException {
zkClient = new ZooKeeper(connectStr, sessionTimout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//重新再次获取商家列表
try {
shopList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
当我们运行shop创建节点:用户端就会收到监听
当我们断开商家连接一次断开,由于我们创建的临时有序的节点,所以它会按照顺序进行断开,用户端收到下线通知:
注意:这里会有一定的延迟
目前正在营业的商店列表:[jiaozi, KFC, baozi, baozi2]
目前正在营业的商店列表:[KFC, baozi, baozi2]
目前正在营业的商店列表:[baozi, baozi2]
案例-分布式锁-商品秒杀
锁:在多线程中接触过,作用就是让当前的资源不会被其他线程访问。 在Zookeeper中使用传统的锁会引发“羊群效应”:1000个人创建节点,只有一个人能成功,999人需要等待。 羊群是一种很散乱的组织,平时在一起也是盲目的左冲右撞,一旦有一头羊动起来,其他的羊也会不假思索的一哄而上,全然不顾旁边可能有狼。
为了避免上述的“羊群效应”没有组织、没有纪律,Zookeeper采用分布式锁,有组织、有纪律
- 所有请求进来,在/lock下创建临时顺序节点,Zookeeper会帮助编号排序
- 判断自己是不是/lock下最小的节点
- 是,获得锁 - 创建节点
- 否,对前面小我一级的节点进行监听
- 获得锁请求,处理完业务逻辑,释放锁(删除节点),后一个节点得到通知(比你年轻的死了,你成为了最嫩的了)
- 重复步骤2
1. 初始化数据库
在ip为131的一个linux虚拟机中创建数据库:zkproduct
-- 商品表
create table product(
id int primary key auto_increment, -- 商品编号
product_name varchar(20) not null, -- 商品名称
stock int not null, -- 库存
version int not null -- 版本
)
insert into product (product_name,stock,version) values('锦鲤-清空购物车-大奖',5,0)
-- 订单表
create table `order`(
id varchar(100) primary key, -- 订单编号 pid int not null, -- 商品编号
userid int not null -- 用户编号
)
2.搭建SSM工程
不再写配置代码了,直接看业务层的代码:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private ProductMapper productMapper;
@Autowired
private OrderMapper orderMapper;
@Transactional
@Override
public void reduceStock(Integer id) {
//获取库存 查询商品查询到商品则减少库存
Product product = productMapper.getProductById(id);
//商品存在,并且库存大于0
if (product != null && product.getStock() > 0) {
int i = productMapper.reduceStock(id);
if (i == 1) {
//减库存成功生成订单
Order order = new Order(UUID.randomUUID().toString(), id, 1001);
orderMapper.insertOrder(order);
} else {
//减库存失败
throw new RuntimeException("减库存失败");
}
} else {
throw new RuntimeException("商品已经抢光");
}
}
}
3. Nginx+jmeter 并发测试
启动两次工程(本机):8001 8002
如下两个端口需要修改:
注意一定要选择exploded并且application context:/
这样我们就启用了两个Tomcat。
使用nginx做负载均衡(在虚拟机),nginx配置:
注意在配置负载均衡的upstream是本机的IP地址,server_name 设置的是域名这里其实可以写localhost,为了区分,使用域名的形式,在host文件中进行修改
upstream sga{
server 172.16.150.1:8001;
server 172.16.150.1:8002;
}
server {
listen 80;
server_name www.zk.com;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
proxy_pass http://sga;
root html;
index index.html index.htm;
}
host 添加:172.16.150.130 是虚拟机的IP地址
172.16.150.130 www.zk.com
其实本质上就是localhost
:wq
保存配置启动Nginx, 如果你还不太熟练Nginx看这篇文章。
Nginx 快速入门
通过Jmeter进行并发测试,首先安装jmeter,直接解压即可。apache-jmeter-5.3.zip
然后配置环境变量(mac)
# jmeter配置
export JMETER_HOME=/Users/prim/java/apache-jmeter-5.3
export PATH=$JAVA_HOME/bin:$PATH:.:$JMETER_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JMETER_HOME/lib/ext/ApacheJMeter_core.jar:$JMETER_HOME/lib/jorphan.jar:$JMETER_HOME/lib/logkit-2.0.jar
在命令行输入:jmeter
即可打开GUI工具
- 添加线程组:
- 配置线程组
- 配置请求
- 监听请求的结果
- 执行的结果,有6个成功了,4个失败了
使用Zookeeper
基于Zookeeper原生态的客户端类实现分布式非常麻烦的,使用Apache提供了Curator 封装的Zookeeper客户端来实现。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version> <!-- 网友投票最牛逼版本 -->
</dependency>
recipes是curator族谱大全,里面包含zookeeper和framework
Curator 实现分布式锁:
@RestController
@RequestMapping("/product")
public class ProductController {
@Autowired
private OrderService orderService;
//Zookeeper集群
private String connectStr = "172.16.150.130:2181,172.16.150.131:2181,172.16.150.132:2181";
/**
* 减库存操作
*
* @param id
* @return
*/
@GetMapping("/reduce")
public Object reduce(Integer id) throws Exception {
//重试策略,1000ms 重试1次 最多试3次
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
//创建curator 工具对象
CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, retry);
client.start();
//根据客户端工具对象 创建锁"内部互斥锁" "/product_" + id 每个商品创建一个节点,例如商品id为1 创建的节点就是/product_1
InterProcessMutex lock = new InterProcessMutex(client, "/product_" + id);
try {
lock.acquire();
//加锁
orderService.reduceStock(id);
} finally {
//释放锁
lock.release();
}
return "ok";
}
}
重置数据库,在用jmeter跑一下:成功5个,失败了5个
我们在看一下,数据库:5个商品库存为0,分为5个用户。