CRUD
1. 原生API - CRUD
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
2. ZkClient API - CRUD
3. Curator API - CRUD
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
场景:服务动态上下线
1. 实现思路
- 服务端启动时向ZooKeeper注册节点(临时节点)。
- 客户端获取当前在线服务列表,并注册监听(一次有效)。
- 当服务器节点上下线时,ZooKeeper将新增或删除该节点,并触发监听事件。
- ZooKeeper发布服务器节点上下线通知给客户端监听者。
-
2. 服务端
/**
* @Description 服务动态上下线监控(服务端)
*/
public class DistributeServer {
private String connectString = "bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
// 启动前请先创建ZNODE:`create /servers "servers"`
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
if(args==null||args.length<1){
args=new String[]{"bigdata-hk-node3"};
}
DistributeServer server = new DistributeServer();
// 1 获取zk连接
server.getConnect();
// 2 注册服务器到zk集群
server.register(args[0]);
// 3 启动业务逻辑(睡觉)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String hostname) throws KeeperException, InterruptedException {
String server = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(hostname +" is online") ;
System.out.println(server) ;
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> System.out.println("客户端初始化连接成功...."));
}
}
3. 客户端
/**
* @Description 服务动态上下线监控(客户端)
*/
public class DistributeClient {
private String connectString = "bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
// 1 获取zk连接
client.getConnect();
// 2 监听/servers下面子节点的增加和删除
client.getServerList();
// 3 业务逻辑(睡觉)
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println("获取最新服务器列表-----------------------------");
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
场景:分布式锁
1. 实现思路
- 实现方式1(推荐)
- 实现方式2(不推荐)
2. 算法描述
3. 代码实现
1. 原生API
/**
* @Description 分布式锁(原生API)
*/
public class DistributedLock {
private String connectString = "bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentMode;
public static void main(String[] args) throws Exception {
final DistributedLock lock1 = new DistributedLock("服务器1");
final DistributedLock lock2 = new DistributedLock("服务器2");
new Thread(() -> {
try {
lock1.zklock();
System.out.println("服务器1 启动,获取到锁");
int time=new Random().nextInt(8)+1;
System.out.println("服务器1 恰饭大概耗时:"+time+"秒,请等待....");
Thread.sleep(time * 1000);
lock1.unZkLock();
System.out.println("服务器1 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.zklock();
System.out.println("服务器2 启动,获取到锁");
int time=new Random().nextInt(8)+1;
System.out.println("服务器2 咪西大概耗时:"+time+"秒,请等待....");
Thread.sleep(time * 1000);
lock2.unZkLock();
System.out.println("服务器2 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
public DistributedLock(String clientName) throws IOException, InterruptedException, KeeperException {
// 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// connectLatch 如果连接上zk 可以释放
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// waitLatch 需要释放
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
});
// 等待zk正常连接后,往下走程序
connectLatch.await();
System.out.println(clientName+"成功连接上zk集群!开始抢占分布式锁....");
// 判断根节点/locks是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
System.out.println("不存在/locks节点,正在创建中....");
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 对zk加锁
public void zklock() {
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会, 让结果更清晰一些
Thread.sleep(10);
// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
List<String> children = zk.getChildren("/locks", false);
// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
if (children.size() == 1) {
return;
} else {
Collections.sort(children);
// 获取节点名称 seq-00000000
String thisNode = currentMode.substring("/locks/".length());
// 通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
// 判断
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// 就一个节点,可以获取锁了
return;
} else {
// 需要监听 他前一个节点变化
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, new Stat());
// 等待监听
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁
public void unZkLock() {
// 删除节点
try {
zk.delete(this.currentMode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
运行结果:
服务器1成功连接上zk集群!开始抢占分布式锁....
不存在/locks节点,正在创建中....
服务器2成功连接上zk集群!开始抢占分布式锁....
服务器1 启动,获取到锁
服务器1 恰饭大概耗时:2秒,请等待....
服务器2 启动,获取到锁
服务器2 咪西大概耗时:7秒,请等待....
服务器1 释放锁
服务器2 释放锁
2. Curator API
/**
* @Description 分布式锁(Curator)
*/
public class DistributedLock {
public static void main(String[] args) {
// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework("服务器1"), "/locks");
// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework("服务器2"), "/locks");
new Thread(() -> {
try {
lock1.acquire();
System.out.println("服务器1 获取到锁");
lock1.acquire();
System.out.println("服务器1 再次获取到锁");
int time=new Random().nextInt(8)+1;
System.out.println("服务器1 恰饭大概耗时:"+time+"秒,请等待....");
Thread.sleep(time * 1000);
lock1.release();
System.out.println("服务器1 释放锁");
lock1.release();
System.out.println("服务器1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.acquire();
System.out.println("服务器2 获取到锁");
lock2.acquire();
System.out.println("服务器2 再次获取到锁");
int time=new Random().nextInt(8)+1;
System.out.println("服务器2 咪西大概耗时:"+time+"秒,请等待....");
Thread.sleep(time * 1000);
lock2.release();
System.out.println("服务器2 释放锁");
lock2.release();
System.out.println("服务器2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private static CuratorFramework getCuratorFramework(String clientName) {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("bigdata-hk-node1:2181,bigdata-hk-node2:2181,bigdata-hk-node3:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
// 启动客户端
client.start();
System.out.println(clientName+"成功连接上zk集群!开始抢占分布式锁....");
return client;
}
}
运行结果:
服务器1成功连接上zk集群!开始抢占分布式锁....
服务器2成功连接上zk集群!开始抢占分布式锁....
服务器2 获取到锁
服务器2 再次获取到锁
服务器2 咪西大概耗时:3秒,请等待....
服务器2 释放锁
服务器2 再次释放锁
服务器1 获取到锁
服务器1 再次获取到锁
服务器1 恰饭大概耗时:2秒,请等待....
服务器1 释放锁
服务器1 再次释放锁
场景:HA
1. Hadoop HA
1. HDFS HA
1. 基本原理
Hadoop-2.0的HA机制官方介绍了有2种方式,一种是NFS(Network File System)方式,另外一种是QJM(Quorum Journal Manager)方式。
Hadoop-2.x之后,Clouera提出了QJM/Qurom Journal Manager,这是一个基于Paxos算法实现的HDFS HA方案,它给出了一种较好的解决思路和方案,示意图如下:
- 基本原理就是用2N+1台JN存储EditLog,每次写数据操作有大多数(>=N+1)返回成功时即认为该次写成功,数据不会丢失了。当然这个算法所能容忍的是最多有N台机器挂掉,如果多于N台挂掉,这个算法就失效了。这个原理是基于Paxos算法。
- 在HA架构里面SecondaryNameNode这个冷备角色已经不存在了,为了保持standby NN时时的与主Active NN的元数据保持一致,他们之间交互通过一系列守护的轻量级进程JournalNode。
- 任何修改操作在Active NN上执行时,JN进程同时也会记录修改log到至少半数以上的JN中,这时Standby NN监测到JN里面的同步log发生变化了会读取JN里面的修改log,然后同步到自己的的目录镜像树里面,如下图:
当发生故障时,Active的NN挂掉后,Standby NN会在它成为Active NN前,读取所有的JN里面的修改日志,这样就能高可靠的保证与挂掉的NN的目录镜像树一致,然后无缝的接替它的职责,维护来自客户端请求,从而达到一个高可用的目的。
2. 详细配置
修改hdfs-site.xml配置文件。
<!-- HDFS数据副本数,默认3副本:本节点+同机架节点+其他机架节点,一般不大于datanode的节点数,建议默认3副本-->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!-- HDFS中启用权限检查配置-->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<!--以下为Hadoop HA配置 -->
<!--HDFS服务逻辑名称,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns</value>
</property>
<!-- ns下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn1</name>
<value>bigdata-pro02-node1:8020</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn1</name>
<value>bigdata-pro02-node1:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn2</name>
<value>bigdata-pro02-node2:8020</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn2</name>
<value>bigdata-pro02-node2:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://bigdata-pro02-node1:8485;bigdata-pro02-node2:8485;bigdata-pro02-node3:8485/ns</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置,需创建 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/modules/hadoop-2.5.0/jn</value>
</property>
<!-- 开启NameNode失败自动切换,注意加.ns -->
<property>
<name>dfs.ha.automatic-failover.enabled.ns</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制方法(HA功能的防脑裂方法),多个机制用换行分割,即每个机制暂用一行,sshfence / shell(/bin/true)-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
<!-- 使用sshfence隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/polaris/.ssh/id_rsa</value>
</property>
注:脑裂(split-brain),指在一个高可用(HA)系统中,当联系着的两个节点断开联系时,本来为一个整体的系统,分裂为两个独立节点,这时两个节点开始争抢共享资源,结果会导致系统混乱,数据损坏。
修改core-site.xml配置文件。
<!-- Hadoop nameService逻辑名称,与hdfs-size.xml保持一致 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns</value>
</property>
<!--WEB UI访问数据使用的用户名 -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>polaris</value>
</property>
<!-- Hadoop的临时目录,服务端参数,修改需重启。NameNode的Image/Edit目录依赖此配置 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/modules/hadoop-2.5.0/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>bigdata-pro02-node1:2181,bigdata-pro02-node2:2181,bigdata-pro02-node3.kfk.com:2181</value>
</property>
将修改的配置分发到其他节点。
cd ${HADOOP_HOME}
scp -r hdfs-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/hdfs-site.xml
scp -r hdfs-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/hdfs-site.xml
scp -r core-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/core-site.xml
scp -r core-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/core-site.xml
3. 自动故障转移测试
启动所有节点上面的ZooKeeper进程。
cd /opt/modules/zookeeper-3.4.5/bin
sh zkServer.sh start
启动所有节点上面的JournalNode进程。
cd /opt/modules/hadoop-2.5.0/
sbin/hadoop-daemon.sh start journalnode
在[nn1]上,对NameNode进行格式化,并启动。
cd /opt/modules/hadoop-2.5.0/
# NameNode 格式化(首次启动)
bin/hdfs namenode -format
# NameNode 格式化(非首次启动,同步Edits)
bin/hdfs namenode -initializeSharedEdits
# 格式化高可用,在zk上创建节点/hadoop-ha/ns
bin/hdfs zkfc -formatZK
# 启动NameNode,供其它NameNode同步元数据
bin/hdfs namenode
在[nn2]上,同步nn1元数据信息。
cd /opt/modules/hadoop-2.5.0/
bin/hdfs namenode -bootstrapStandby
nn2同步完数据后,在nn1上,按下ctrl+c来结束NameNode进程。然后关闭所有节点上面的JournalNode进程。
cd /opt/modules/hadoop-2.5.0/
sbin/hadoop-daemon.sh stop journalnode
一键启动hdfs所有相关进程。
cd /opt/modules/hadoop-2.5.0/
sbin/start-dfs.sh
# 在各个NameNode节点上启动DFSZK Failover Controller,先在那台机器启动,那个机器的NameNode就是Active NameNode
sbin/hadoop-daemon.sh start zkfc
hdfs启动之后,kill其中Active状态的NameNode,检查另外一个NameNode是否会自动切换为Active状态。同时通过命令上传文件至hdfs,检查hdfs是否可用。
2. YARN HA
1. 基本原理
ResourceManager HA由一对Active,Standby结点构成,通过RMStateStore存储内部数据和主要应用的数据及标记。目前支持的可替代的RMStateStore实现有:基于内存的MemoryRMStateStore,基于文件系统的FileSystemRMStateStore,及基于zookeeper的ZKRMStateStore。 ResourceManager HA的架构模式同NameNode HA的架构模式基本一致,数据共享由RMStateStore,而ZKFC成为 ResourceManager进程的一个服务,非独立存在。
2. 详细配置
修改mapred-site.xml配置文件。
<!-- 配置MapReduce运行环境,yarn/yarn-tez -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
修改yarn-site.xml配置文件。 ```xml
yarn.log-aggregation-enable true yarn.log-aggregation.retain-seconds 10000 yarn.nodemanager.aux-services mapreduce_shuffle
3. 将修改的配置分发到其他节点。
```bash
cd ${HADOOP_HOME}
scp -r yarn-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/yarn-site.xml
scp -r yarn-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/yarn-site.xml
scp -r mapred-site.xml polaris@bigdata-pro02-node2:${HADOOP_HOME}/etc/hadoop/mapred-site.xml
scp -r mapred-site.xml polaris@bigdata-pro02-node3:${HADOOP_HOME}/etc/hadoop/mapred-site.xml
3. 自动故障转移测试
在rm1节点上启动yarn服务(先启动zk、hdfs-ha)。
cd /opt/modules/hadoop-2.5.0/
sbin/start-yarn.sh
在rm2节点上启动ResourceManager服务。
cd /opt/modules/hadoop-2.5.0/
sbin/yarn-daemon.sh start resourcemanager
浏览器查看yarn。
Web UI:http://bigdata-pro02-node1:8088、http://bigdata-pro02-node2:8088
查看ResourceManager主备节点状态。
cd /opt/modules/hadoop-2.5.0/
# bigdata-pro02-node1节点上执行
bin/yarn rmadmin -getServiceState rm1
# bigdata-pro02-node2节点上执行
bin/yarn rmadmin -getServiceState rm2
Hadoop集群测试WordCount运行。
cd /opt/modules/hadoop-2.5.0/
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /tmp/input /tmp/output
3. HBase HA
为了增加HBase集群的可用性,可以为HBase增加多个backup master。当master挂掉后,backup master可以自动接管整个HBase的集群。(配置好ZK后,不配置backup-masters,而直接在启动主节点HMaster后,再启动其它节点的HMaster,即可自动加入到备份HMaster列表中)。 ```bash cd ${HBASE_HOME}/conf vi backup-masters
##### 内容如下
bigdata-pro02-node2 ```