CRUD
1. 原生API - CRUD
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version></dependency>
2. ZkClient API - CRUD
3. Curator API - CRUD
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version></dependency>
场景:服务动态上下线
1. 实现思路

- 服务端启动时向ZooKeeper注册节点(临时节点)。
- 客户端获取当前在线服务列表,并注册监听(一次有效)。
- 当服务器节点上下线时,ZooKeeper将新增或删除该节点,并触发监听事件。
- ZooKeeper发布服务器节点上下线通知给客户端监听者。
-
2. 服务端
/*** @Description 服务动态上下线监控(服务端)*/public class DistributeServer {private String connectString = "bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181";private int sessionTimeout = 2000;private ZooKeeper zk;// 启动前请先创建ZNODE:`create /servers "servers"`public static void main(String[] args) throws IOException, KeeperException, InterruptedException {if(args==null||args.length<1){args=new String[]{"bigdata-hk-node3"};}DistributeServer server = new DistributeServer();// 1 获取zk连接server.getConnect();// 2 注册服务器到zk集群server.register(args[0]);// 3 启动业务逻辑(睡觉)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void register(String hostname) throws KeeperException, InterruptedException {String server = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println(hostname +" is online") ;System.out.println(server) ;}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> System.out.println("客户端初始化连接成功...."));}}
3. 客户端
/*** @Description 服务动态上下线监控(客户端)*/public class DistributeClient {private String connectString = "bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181";private int sessionTimeout = 2000;private ZooKeeper zk;public static void main(String[] args) throws IOException, KeeperException, InterruptedException {DistributeClient client = new DistributeClient();// 1 获取zk连接client.getConnect();// 2 监听/servers下面子节点的增加和删除client.getServerList();// 3 业务逻辑(睡觉)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws KeeperException, InterruptedException {List<String> children = zk.getChildren("/servers", true);ArrayList<String> servers = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}System.out.println(servers);}private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println("获取最新服务器列表-----------------------------");getServerList();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});}}
场景:分布式锁
1. 实现思路
- 实现方式1(推荐)

- 实现方式2(不推荐)
2. 算法描述
3. 代码实现
1. 原生API
/*** @Description 分布式锁(原生API)*/public class DistributedLock {private String connectString = "bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181";private final int sessionTimeout = 2000;private final ZooKeeper zk;private CountDownLatch connectLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);private String waitPath;private String currentMode;public static void main(String[] args) throws Exception {final DistributedLock lock1 = new DistributedLock("服务器1");final DistributedLock lock2 = new DistributedLock("服务器2");new Thread(() -> {try {lock1.zklock();System.out.println("服务器1 启动,获取到锁");int time=new Random().nextInt(8)+1;System.out.println("服务器1 恰饭大概耗时:"+time+"秒,请等待....");Thread.sleep(time * 1000);lock1.unZkLock();System.out.println("服务器1 释放锁");} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(() -> {try {lock2.zklock();System.out.println("服务器2 启动,获取到锁");int time=new Random().nextInt(8)+1;System.out.println("服务器2 咪西大概耗时:"+time+"秒,请等待....");Thread.sleep(time * 1000);lock2.unZkLock();System.out.println("服务器2 释放锁");} catch (InterruptedException e) {e.printStackTrace();}}).start();}public DistributedLock(String clientName) throws IOException, InterruptedException, KeeperException {// 获取连接zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {// connectLatch 如果连接上zk 可以释放if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 需要释放if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {waitLatch.countDown();}});// 等待zk正常连接后,往下走程序connectLatch.await();System.out.println(clientName+"成功连接上zk集群!开始抢占分布式锁....");// 判断根节点/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {System.out.println("不存在/locks节点,正在创建中....");zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 对zk加锁public void zklock() {try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// wait一小会, 让结果更清晰一些Thread.sleep(10);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点List<String> children = zk.getChildren("/locks", false);// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小if (children.size() == 1) {return;} else {Collections.sort(children);// 获取节点名称 seq-00000000String thisNode = currentMode.substring("/locks/".length());// 通过seq-00000000获取该节点在children集合的位置int index = children.indexOf(thisNode);// 判断if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// 就一个节点,可以获取锁了return;} else {// 需要监听 他前一个节点变化waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath, true, new Stat());// 等待监听waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}// 解锁public void unZkLock() {// 删除节点try {zk.delete(this.currentMode, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}
运行结果:
服务器1成功连接上zk集群!开始抢占分布式锁....不存在/locks节点,正在创建中....服务器2成功连接上zk集群!开始抢占分布式锁....服务器1 启动,获取到锁服务器1 恰饭大概耗时:2秒,请等待....服务器2 启动,获取到锁服务器2 咪西大概耗时:7秒,请等待....服务器1 释放锁服务器2 释放锁
2. Curator API
/*** @Description 分布式锁(Curator)*/public class DistributedLock {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework("服务器1"), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework("服务器2"), "/locks");new Thread(() -> {try {lock1.acquire();System.out.println("服务器1 获取到锁");lock1.acquire();System.out.println("服务器1 再次获取到锁");int time=new Random().nextInt(8)+1;System.out.println("服务器1 恰饭大概耗时:"+time+"秒,请等待....");Thread.sleep(time * 1000);lock1.release();System.out.println("服务器1 释放锁");lock1.release();System.out.println("服务器1 再次释放锁");} catch (Exception e) {e.printStackTrace();}}).start();new Thread(() -> {try {lock2.acquire();System.out.println("服务器2 获取到锁");lock2.acquire();System.out.println("服务器2 再次获取到锁");int time=new Random().nextInt(8)+1;System.out.println("服务器2 咪西大概耗时:"+time+"秒,请等待....");Thread.sleep(time * 1000);lock2.release();System.out.println("服务器2 释放锁");lock2.release();System.out.println("服务器2 再次释放锁");} catch (Exception e) {e.printStackTrace();}}).start();}private static CuratorFramework getCuratorFramework(String clientName) {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println(clientName+"成功连接上zk集群!开始抢占分布式锁....");return client;}}
运行结果:
服务器1成功连接上zk集群!开始抢占分布式锁....服务器2成功连接上zk集群!开始抢占分布式锁....服务器2 获取到锁服务器2 再次获取到锁服务器2 咪西大概耗时:3秒,请等待....服务器2 释放锁服务器2 再次释放锁服务器1 获取到锁服务器1 再次获取到锁服务器1 恰饭大概耗时:2秒,请等待....服务器1 释放锁服务器1 再次释放锁
场景:HA
1. Hadoop HA
1. HDFS HA
1. 基本原理
Hadoop-2.0的HA机制官方介绍了有2种方式,一种是NFS(Network File System)方式,另外一种是QJM(Quorum Journal Manager)方式。
Hadoop-2.x之后,Clouera提出了QJM/Qurom Journal Manager,这是一个基于Paxos算法实现的HDFS HA方案,它给出了一种较好的解决思路和方案,示意图如下:
- 基本原理就是用2N+1台JN存储EditLog,每次写数据操作有大多数(>=N+1)返回成功时即认为该次写成功,数据不会丢失了。当然这个算法所能容忍的是最多有N台机器挂掉,如果多于N台挂掉,这个算法就失效了。这个原理是基于Paxos算法。
- 在HA架构里面SecondaryNameNode这个冷备角色已经不存在了,为了保持standby NN时时的与主Active NN的元数据保持一致,他们之间交互通过一系列守护的轻量级进程JournalNode。
- 任何修改操作在Active NN上执行时,JN进程同时也会记录修改log到至少半数以上的JN中,这时Standby NN监测到JN里面的同步log发生变化了会读取JN里面的修改log,然后同步到自己的的目录镜像树里面,如下图:

当发生故障时,Active的NN挂掉后,Standby NN会在它成为Active NN前,读取所有的JN里面的修改日志,这样就能高可靠的保证与挂掉的NN的目录镜像树一致,然后无缝的接替它的职责,维护来自客户端请求,从而达到一个高可用的目的。
2. 详细配置
修改hdfs-site.xml配置文件。
<!-- HDFS数据副本数,默认3副本:本节点+同机架节点+其他机架节点,一般不大于datanode的节点数,建议默认3副本--><property><name>dfs.replication</name><value>2</value></property><!-- HDFS中启用权限检查配置--><property><name>dfs.permissions.enabled</name><value>false</value></property><!--以下为Hadoop HA配置 --><!--HDFS服务逻辑名称,需要和core-site.xml中的保持一致 --><property><name>dfs.nameservices</name><value>ns</value></property><!-- ns下面有两个NameNode,分别是nn1,nn2 --><property><name>dfs.ha.namenodes.ns</name><value>nn1,nn2</value></property><!-- nn1的RPC通信地址 --><property><name>dfs.namenode.rpc-address.ns.nn1</name><value>bigdata-pro02-node1:8020</value></property><!-- nn1的http通信地址 --><property><name>dfs.namenode.http-address.ns.nn1</name><value>bigdata-pro02-node1:50070</value></property><!-- nn2的RPC通信地址 --><property><name>dfs.namenode.rpc-address.ns.nn2</name><value>bigdata-pro02-node2:8020</value></property><!-- nn2的http通信地址 --><property><name>dfs.namenode.http-address.ns.nn2</name><value>bigdata-pro02-node2:50070</value></property><!-- 指定NameNode的元数据在JournalNode上的存放位置 --><property><name>dfs.namenode.shared.edits.dir</name><value>qjournal://bigdata-pro02-node1:8485;bigdata-pro02-node2:8485;bigdata-pro02-node3:8485/ns</value></property><!-- 指定JournalNode在本地磁盘存放数据的位置,需创建 --><property><name>dfs.journalnode.edits.dir</name><value>/opt/modules/hadoop-2.5.0/jn</value></property><!-- 开启NameNode失败自动切换,注意加.ns --><property><name>dfs.ha.automatic-failover.enabled.ns</name><value>true</value></property><!-- 配置失败自动切换实现方式 --><property><name>dfs.client.failover.proxy.provider.ns</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property><!-- 配置隔离机制方法(HA功能的防脑裂方法),多个机制用换行分割,即每个机制暂用一行,sshfence / shell(/bin/true)--><property><name>dfs.ha.fencing.methods</name><value>sshfence</value></property><!-- 配置sshfence隔离机制超时时间 --><property><name>dfs.ha.fencing.ssh.connect-timeout</name><value>30000</value></property><!-- 使用sshfence隔离机制时需要ssh免登陆 --><property><name>dfs.ha.fencing.ssh.private-key-files</name><value>/home/polaris/.ssh/id_rsa</value></property>
注:脑裂(split-brain),指在一个高可用(HA)系统中,当联系着的两个节点断开联系时,本来为一个整体的系统,分裂为两个独立节点,这时两个节点开始争抢共享资源,结果会导致系统混乱,数据损坏。
修改core-site.xml配置文件。
<!-- Hadoop nameService逻辑名称,与hdfs-size.xml保持一致 --><property><name>fs.defaultFS</name><value>hdfs://ns</value></property><!--WEB UI访问数据使用的用户名 --><property><name>hadoop.http.staticuser.user</name><value>polaris</value></property><!-- Hadoop的临时目录,服务端参数,修改需重启。NameNode的Image/Edit目录依赖此配置 --><property><name>hadoop.tmp.dir</name><value>/opt/modules/hadoop-2.5.0/tmp</value></property><!-- 指定zookeeper地址 --><property><name>ha.zookeeper.quorum</name><value>bigdata-pro02-node1:2181,bigdata-pro02-node2:2181,bigdata-pro02-node3.kfk.com:2181</value></property>
将修改的配置分发到其他节点。
cd ${HADOOP_HOME}scp -r hdfs-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/hdfs-site.xmlscp -r hdfs-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/hdfs-site.xmlscp -r core-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/core-site.xmlscp -r core-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/core-site.xml
3. 自动故障转移测试
启动所有节点上面的ZooKeeper进程。
cd /opt/modules/zookeeper-3.4.5/binsh zkServer.sh start
启动所有节点上面的JournalNode进程。
cd /opt/modules/hadoop-2.5.0/sbin/hadoop-daemon.sh start journalnode
在[nn1]上,对NameNode进行格式化,并启动。
cd /opt/modules/hadoop-2.5.0/# NameNode 格式化(首次启动)bin/hdfs namenode -format# NameNode 格式化(非首次启动,同步Edits)bin/hdfs namenode -initializeSharedEdits# 格式化高可用,在zk上创建节点/hadoop-ha/nsbin/hdfs zkfc -formatZK# 启动NameNode,供其它NameNode同步元数据bin/hdfs namenode
在[nn2]上,同步nn1元数据信息。
cd /opt/modules/hadoop-2.5.0/bin/hdfs namenode -bootstrapStandby
nn2同步完数据后,在nn1上,按下ctrl+c来结束NameNode进程。然后关闭所有节点上面的JournalNode进程。
cd /opt/modules/hadoop-2.5.0/sbin/hadoop-daemon.sh stop journalnode
一键启动hdfs所有相关进程。
cd /opt/modules/hadoop-2.5.0/sbin/start-dfs.sh# 在各个NameNode节点上启动DFSZK Failover Controller,先在那台机器启动,那个机器的NameNode就是Active NameNodesbin/hadoop-daemon.sh start zkfc
hdfs启动之后,kill其中Active状态的NameNode,检查另外一个NameNode是否会自动切换为Active状态。同时通过命令上传文件至hdfs,检查hdfs是否可用。
2. YARN HA
1. 基本原理
ResourceManager HA由一对Active,Standby结点构成,通过RMStateStore存储内部数据和主要应用的数据及标记。目前支持的可替代的RMStateStore实现有:基于内存的MemoryRMStateStore,基于文件系统的FileSystemRMStateStore,及基于zookeeper的ZKRMStateStore。 ResourceManager HA的架构模式同NameNode HA的架构模式基本一致,数据共享由RMStateStore,而ZKFC成为 ResourceManager进程的一个服务,非独立存在。
2. 详细配置
修改mapred-site.xml配置文件。
<!-- 配置MapReduce运行环境,yarn/yarn-tez --><property><name>mapreduce.framework.name</name><value>yarn</value></property>
修改yarn-site.xml配置文件。 ```xml
yarn.log-aggregation-enable true yarn.log-aggregation.retain-seconds 10000 yarn.nodemanager.aux-services mapreduce_shuffle
3. 将修改的配置分发到其他节点。```bashcd ${HADOOP_HOME}scp -r yarn-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/yarn-site.xmlscp -r yarn-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/yarn-site.xmlscp -r mapred-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/mapred-site.xmlscp -r mapred-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/mapred-site.xml
3. 自动故障转移测试
在rm1节点上启动yarn服务(先启动zk、hdfs-ha)。
cd /opt/modules/hadoop-2.5.0/sbin/start-yarn.sh
在rm2节点上启动ResourceManager服务。
cd /opt/modules/hadoop-2.5.0/sbin/yarn-daemon.sh start resourcemanager
浏览器查看yarn。
Web UI:http://bigdata-pro02-node1:8088、http://bigdata-pro02-node2:8088
查看ResourceManager主备节点状态。
cd /opt/modules/hadoop-2.5.0/# bigdata-pro02-node1节点上执行bin/yarn rmadmin -getServiceState rm1# bigdata-pro02-node2节点上执行bin/yarn rmadmin -getServiceState rm2
Hadoop集群测试WordCount运行。
cd /opt/modules/hadoop-2.5.0/bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /tmp/input /tmp/output
3. HBase HA
为了增加HBase集群的可用性,可以为HBase增加多个backup master。当master挂掉后,backup master可以自动接管整个HBase的集群。(配置好ZK后,不配置backup-masters,而直接在启动主节点HMaster后,再启动其它节点的HMaster,即可自动加入到备份HMaster列表中)。 ```bash cd ${HBASE_HOME}/conf vi backup-masters
##### 内容如下
bigdata-pro02-node2 ```
