1. master选举
考虑7*24小时向外提供服务的系统,不能有单点故障,于是我们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程中,从备机选出一台机作为主机的过程,就是Master选举。
架构图
左边是ZooKeeper集群,右边是3台工作服务器。工作服务器启动时,会去ZooKeeper的Servers节点下创建临时节点,并把基本信息写入临时节点。这个过程叫服务注册,系统中的其他服务可以通过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫做服务发现。接着这些服务器会尝试创建Master临时节点,谁创建成功谁就是Master,其他的两台就作为Slave。所有的Work Server必需关注Master节点的删除事件。通过监听Master节点的删除事件,来了解Master服务器是否宕机(创建临时节点的服务器一旦宕机,它所创建的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。
流程图
核心类图
WorkServer对应架构图的WorkServer,是主工作类;
RunningData用来描述WorkServer的基本信息;
LeaderSelectorZkClient作为调度器来启动和停止WorkServer;
代码实现
/**
* 工作服务器信息
*/
public class RunningData implements Serializable {
private static final long serialVersionUID = 4260577459043203630L;
private Long cid;
private String name;
public Long getCid() {
return cid;
}
public void setCid(Long cid) {
this.cid = cid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
/**
* 工作服务器
*/
public class WorkServer {
// 记录服务器状态
private volatile boolean running = false;
private ZkClient zkClient;
// Master节点对应zookeeper中的节点路径
private static final String MASTER_PATH = "/master";
// 监听Master节点删除事件
private IZkDataListener dataListener;
// 记录当前节点的基本信息
private RunningData serverData;
// 记录集群中Master节点的基本信息
private RunningData masterData;
private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
private int delayTime = 5;
public WorkServer(RunningData rd) {
this.serverData = rd; // 记录服务器基本信息
this.dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
//takeMaster();
if (masterData != null && masterData.getName().equals(serverData.getName())){
// 自己就是上一轮的Master服务器,则直接抢
takeMaster();
} else {
// 否则,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免不必要的数据迁移开销
delayExector.schedule(new Runnable(){
public void run(){
takeMaster();
}
}, delayTime, TimeUnit.SECONDS);
}
}
public void handleDataChange(String dataPath, Object data)
throws Exception {
}
};
}
public ZkClient getZkClient() {
return zkClient;
}
public void setZkClient(ZkClient zkClient) {
this.zkClient = zkClient;
}
// 启动服务器
public void start() throws Exception {
if (running) {
throw new Exception("server has startup...");
}
running = true;
// 订阅Master节点删除事件
zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
// 争抢Master权利
takeMaster();
}
// 停止服务器
public void stop() throws Exception {
if (!running) {
throw new Exception("server has stoped");
}
running = false;
delayExector.shutdown();
// 取消Master节点事件订阅
zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
// 释放Master权利
releaseMaster();
}
// 争抢Master
private void takeMaster() {
if (!running)
return;
try {
// 尝试创建Master临时节点
zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
masterData = serverData;
System.out.println(serverData.getName()+" is master");
// 作为演示,我们让服务器每隔5秒释放一次Master权利
delayExector.schedule(new Runnable() {
public void run() {
// TODO Auto-generated method stub
if (checkMaster()){
releaseMaster();
}
}
}, 5, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e) { // 已被其他服务器创建了
// 读取Master节点信息
RunningData runningData = zkClient.readData(MASTER_PATH, true);
if (runningData == null) {
takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢
} else {
masterData = runningData;
}
} catch (Exception e) {
// ignore;
}
}
// 释放Master权利
private void releaseMaster() {
if (checkMaster()) {
zkClient.delete(MASTER_PATH);
}
}
// 检测自己是否为Master
private boolean checkMaster() {
try {
RunningData eventData = zkClient.readData(MASTER_PATH);
masterData = eventData;
if (masterData.getName().equals(serverData.getName())) {
return true;
}
return false;
} catch (ZkNoNodeException e) {
return false; // 节点不存在,自己肯定不是Master了
} catch (ZkInterruptedException e) {
return checkMaster();
} catch (ZkException e) {
return false;
}
}
}
/**
* 调度器
*/
public class LeaderSelectorZkClient {
//启动的服务个数
private static final int CLIENT_QTY = 10;
//zookeeper服务器的地址
private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
public static void main(String[] args) throws Exception {
//保存所有zkClient的列表
List<ZkClient> clients = new ArrayList<ZkClient>();
//保存所有服务的列表
List<WorkServer> workServers = new ArrayList<WorkServer>();
try {
for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟创建10个服务器并启动
//创建zkClient
ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
clients.add(client);
//创建serverData
RunningData runningData = new RunningData();
runningData.setCid(Long.valueOf(i));
runningData.setName("Client #" + i);
//创建服务
WorkServer workServer = new WorkServer(runningData);
workServer.setZkClient(client);
workServers.add(workServer);
workServer.start();
}
System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for ( WorkServer workServer : workServers ) {
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
for ( ZkClient client : clients ) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2. 数据发布订阅
多个订阅者对象同时监听同一主题对象,主题对象状态变化时通知所有订阅者对象更新自身状态。发布方和订阅方独立封装、独立改变,当一个对象的改变需要同时改变其他对象,并且它不知道有多少个对象需要改变时,可以使用发布订阅模式。
在分布式系统中的顶级应用有配置管理和服务发现。
配置管理:指集群中的机器拥有某些配置,并且这些配置信息需要动态地改变,那么我们就可以使用发布订阅模式把配置做统一的管理,让这些机器订阅配置信息的改变,但是配置改变时这些机器得到通知并更新自己的配置。
服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。
架构图
左边代表Zookeeper集群,右侧代表服务器集群。其中前3个为工作服务器Work Server,绿色为管理服务器Manage Server,最下面的是控制服务器Control Server。
config节点,用于配置管理。Manage Server通过config节点下发配置信息,Work Server可以通过订阅config节点的改变来更新自己的配置。
Servers节点,用于服务发现,每个Work Server在启动时都会在Servers节点下创建一个临时节点,Manage Server充当monitor,通过监听Servers节点的子节点列表的变化来更新自己内存中工作服务器列表的信息。
通过Control Server由Command节点作为中介,向Manage Server发送控制指令。Control Server向command节点写入命令信息,Manage Server订阅command节点的数据改变来监听并执行命令。
流程图
Manage Server程序主体流程
核心类图
WorkServer对应架构图的Work Server;
ManageServer对应架构图的Manage Server;
ServerConfig用于记录Work Server的配置信息;
ServerData用于记录Work Server的基本信息;
SubscribeZkClient作为示例程序入口服务站启动Work Server和Manage Server
实现代码
/**
* 配置信息
*/
public class ServerConfig {
private String dbUrl;
private String dbPwd;
private String dbUser;
public String getDbUrl() {
return dbUrl;
}
public void setDbUrl(String dbUrl) {
this.dbUrl = dbUrl;
}
public String getDbPwd() {
return dbPwd;
}
public void setDbPwd(String dbPwd) {
this.dbPwd = dbPwd;
}
public String getDbUser() {
return dbUser;
}
public void setDbUser(String dbUser) {
this.dbUser = dbUser;
}
@Override
public String toString() {
return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
+ ", dbUser=" + dbUser + "]";
}
}
/**
* 服务器基本信息
*/
public class ServerData {
private String address;
private Integer id;
private String name;
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "ServerData [address=" + address + ", id=" + id + ", name="
+ name + "]";
}
}
/**
* 代表工作服务器
*/
public class WorkServer {
private ZkClient zkClient;
// ZooKeeper
private String configPath;
// ZooKeeper集群中servers节点的路径
private String serversPath;
// 当前工作服务器的基本信息
private ServerData serverData;
// 当前工作服务器的配置信息
private ServerConfig serverConfig;
private IZkDataListener dataListener;
public WorkServer(String configPath, String serversPath,
ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
this.zkClient = zkClient;
this.serversPath = serversPath;
this.configPath = configPath;
this.serverConfig = initConfig;
this.serverData = serverData;
this.dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
}
public void handleDataChange(String dataPath, Object data)
throws Exception {
String retJson = new String((byte[])data);
ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);
updateConfig(serverConfigLocal);
System.out.println("new Work server config is:"+serverConfig.toString());
}
};
}
// 启动服务器
public void start() {
System.out.println("work server start...");
initRunning();
}
// 停止服务器
public void stop() {
System.out.println("work server stop...");
zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
}
// 服务器初始化
private void initRunning() {
registMe(); // 注册自己
zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
}
// 启动时向zookeeper注册自己的注册函数
private void registMe() {
String mePath = serversPath.concat("/").concat(serverData.getAddress());
try {
zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
.getBytes());
} catch (ZkNoNodeException e) {
zkClient.createPersistent(serversPath, true);
registMe();
}
}
// 更新自己的配置信息
private void updateConfig(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
}
}
public class ManageServer {
// zookeeper的servers节点路径
private String serversPath;
// zookeeper的command节点路径
private String commandPath;
// zookeeper的config节点路径
private String configPath;
private ZkClient zkClient;
private ServerConfig config;
// 用于监听servers节点的子节点列表的变化
private IZkChildListener childListener;
// 用于监听command节点数据内容的变化
private IZkDataListener dataListener;
// 工作服务器的列表
private List<String> workServerList;
public ManageServer(String serversPath, String commandPath,
String configPath, ZkClient zkClient, ServerConfig config) {
this.serversPath = serversPath;
this.commandPath = commandPath;
this.zkClient = zkClient;
this.config = config;
this.configPath = configPath;
this.childListener = new IZkChildListener() {
public void handleChildChange(String parentPath,
List<String> currentChilds) throws Exception {
// TODO Auto-generated method stub
workServerList = currentChilds; // 更新内存中工作服务器列表
System.out.println("work server list changed, new list is ");
execList();
}
};
this.dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
// TODO Auto-generated method stub
// ignore;
}
public void handleDataChange(String dataPath, Object data)
throws Exception {
// TODO Auto-generated method stub
String cmd = new String((byte[]) data);
System.out.println("cmd:"+cmd);
exeCmd(cmd); // 执行命令
}
};
}
private void initRunning() {
zkClient.subscribeDataChanges(commandPath, dataListener);
zkClient.subscribeChildChanges(serversPath, childListener);
}
/*
* 1: list 2: create 3: modify
*/
private void exeCmd(String cmdType) {
if ("list".equals(cmdType)) {
execList();
} else if ("create".equals(cmdType)) {
execCreate();
} else if ("modify".equals(cmdType)) {
execModify();
} else {
System.out.println("error command!" + cmdType);
}
}
// 列出工作服务器列表
private void execList() {
System.out.println(workServerList.toString());
}
// 创建config节点
private void execCreate() {
if (!zkClient.exists(configPath)) {
try {
zkClient.createPersistent(configPath, JSON.toJSONString(config)
.getBytes());
} catch (ZkNodeExistsException e) {
zkClient.writeData(configPath, JSON.toJSONString(config)
.getBytes()); // config节点已经存在,则写入内容就可以了
} catch (ZkNoNodeException e) {
String parentDir = configPath.substring(0,
configPath.lastIndexOf('/'));
zkClient.createPersistent(parentDir, true);
execCreate();
}
}
}
// 修改config节点内容
private void execModify() {
// 我们随意修改config的一个属性就可以了
config.setDbUser(config.getDbUser() + "_modify");
try {
zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
} catch (ZkNoNodeException e) {
execCreate(); // 写入时config节点还未存在,则创建它
}
}
// 启动工作服务器
public void start() {
initRunning();
}
// 停止工作服务器
public void stop() {
zkClient.unsubscribeChildChanges(serversPath, childListener);
zkClient.unsubscribeDataChanges(commandPath, dataListener);
}
}
/**
* 调度类
*/
public class SubscribeZkClient {
private static final int CLIENT_QTY = 5; // Work Server数量
private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
private static final String CONFIG_PATH = "/config";
private static final String COMMAND_PATH = "/command";
private static final String SERVERS_PATH = "/servers";
public static void main(String[] args) throws Exception {
List<ZkClient> clients = new ArrayList<ZkClient>();
List<WorkServer> workServers = new ArrayList<WorkServer>();
ManageServer manageServer = null;
try {
// 创建一个默认的配置
ServerConfig initConfig = new ServerConfig();
initConfig.setDbPwd("123456");
initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
initConfig.setDbUser("root");
// 实例化一个Manage Server
ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
manageServer.start(); // 启动Manage Server
// 创建指定个数的工作服务器
for ( int i = 0; i < CLIENT_QTY; ++i ) {
ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
clients.add(client);
ServerData serverData = new ServerData();
serverData.setId(i);
serverData.setName("WorkServer#"+i);
serverData.setAddress("192.168.1."+i);
WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
workServers.add(workServer);
workServer.start(); // 启动工作服务器
}
System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for ( WorkServer workServer : workServers ) {
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
for ( ZkClient client : clients ) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
我们用zookeeper的命令行客户端向Manage Server下达指令。
执行zkCli命令:
create /command list
java控制台输出:
cmd:list
[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]
执行zkCli命令:
set /command create
java控制台输出:
cmd:create
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
执行zkCli命令:
set /command modify
java控制台输出:
cmd:modify
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modi
3. 负载均衡
负载均衡是一种手段,用来把对某种资源的访问分摊给不同的设备,从而减轻单点的压力。
架构图
图中左侧为ZooKeeper集群,右侧上方为工作服务器,下面为客户端。每台工作服务器在启动时都会去zookeeper的servers节点下注册临时节点,每台客户端在启动时都会去servers节点下取得所有可用的工作服务器列表,并通过一定的负载均衡算法计算得出一台工作服务器,并与之建立网络连接。网络连接我们采用开源框架netty。
流程图
负载均衡客户端流程
服务端主体流程
类图
Server端核心类
每个服务端对应一个Server接口,ServiceImpl是服务端的实现类。把服务端启动时的注册过程抽出为一个接口RegistProvider,并给予一个默认实现DefaultRegistProvider,它将用到一个上下文的类ZooKeeperRegistContext。我们的服务端是给予Netty的,它需要ServerHandler来处理与客户端之间的连接,当有客户端建立或失去连接时,我们都需要去修改当前服务器的负载信息,我们把修改负载信息的过程也抽出为一个接口BalanceUpdateProvider,并且给予了一个默认实现DefaultBalanceUpdateProvider。ServerRunner是调度类,负责调度我们的Server。
Client端核心类
每个客户端都需要实现一个Client接口,ClientImpl是实现,Client需要ClientHandler来处理与服务器之前的通讯,同时它需要BalanceProvider为它提供负载均衡的算法。BalanceProvider是接口,它有2个实现类,一个是抽象的实现AbstractBalanceProvider,一个是默认的实现DefaultBalanceProvider。ServerData是服务端和客户端共用的一个类,服务端会把自己的基本信息,包括负载信息,打包成ServerData并写入到zookeeper中,客户端在计算负载的时候需要到zookeeper中拿到ServerData,并取得服务端的地址和负载信息。ClientRunner是客户端的调度类,负责启动客户端。
代码实现
先是Server端的代码:
public class ServerData implements Serializable,Comparable<ServerData> {
private static final long serialVersionUID = -8892569870391530906L;
private Integer balance;
private String host;
private Integer port;
public Integer getBalance() {
return balance;
}
public void setBalance(Integer balance) {
this.balance = balance;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
@Override
public String toString() {
return "ServerData [balance=" + balance + ", host=" + host + ", port="
+ port + "]";
}
public int compareTo(ServerData o) {
return this.getBalance().compareTo(o.getBalance());
}
}
public interface Server {
public void bind();
}
public class ServerImpl implements Server {
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workGroup = new NioEventLoopGroup();
private ServerBootstrap bootStrap = new ServerBootstrap();
private ChannelFuture cf;
private String zkAddress;
private String serversPath;
private String currentServerPath;
private ServerData sd;
private volatile boolean binded = false;
private final ZkClient zc;
private final RegistProvider registProvider;
private static final Integer SESSION_TIME_OUT = 10000;
private static final Integer CONNECT_TIME_OUT = 10000;
public String getCurrentServerPath() {
return currentServerPath;
}
public String getZkAddress() {
return zkAddress;
}
public String getServersPath() {
return serversPath;
}
public ServerData getSd() {
return sd;
}
public void setSd(ServerData sd) {
this.sd = sd;
}
public ServerImpl(String zkAddress, String serversPath, ServerData sd){
this.zkAddress = zkAddress;
this.serversPath = serversPath;
this.zc = new ZkClient(this.zkAddress,SESSION_TIME_OUT,CONNECT_TIME_OUT,new SerializableSerializer());
this.registProvider = new DefaultRegistProvider();
this.sd = sd;
}
//初始化服务端
private void initRunning() throws Exception {
String mePath = serversPath.concat("/").concat(sd.getPort().toString());
//注册到zookeeper
registProvider.regist(new ZooKeeperRegistContext(mePath,zc,sd));
currentServerPath = mePath;
}
public void bind() {
if (binded){
return;
}
System.out.println(sd.getPort()+":binding...");
try {
initRunning();
} catch (Exception e) {
e.printStackTrace();
return;
}
bootStrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ServerHandler(new DefaultBalanceUpdateProvider(currentServerPath,zc)));
}
});
try {
cf = bootStrap.bind(sd.getPort()).sync();
binded = true;
System.out.println(sd.getPort()+":binded...");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public interface RegistProvider {
public void regist(Object context) throws Exception;
public void unRegist(Object context) throws Exception;
}
public class DefaultRegistProvider implements RegistProvider {
// 在zookeeper中创建临时节点并写入信息
public void regist(Object context) throws Exception {
// Server在zookeeper中注册自己,需要在zookeeper的目标节点上创建临时节点并写入自己
// 将需要的以下3个信息包装成上下文传入
// 1:path
// 2:zkClient
// 3:serverData
ZooKeeperRegistContext registContext = (ZooKeeperRegistContext) context;
String path = registContext.getPath();
ZkClient zc = registContext.getZkClient();
try {
zc.createEphemeral(path, registContext.getData());
} catch (ZkNoNodeException e) {
String parentDir = path.substring(0, path.lastIndexOf('/'));
zc.createPersistent(parentDir, true);
regist(registContext);
}
}
public void unRegist(Object context) throws Exception {
return;
}
}
public class ZooKeeperRegistContext {
private String path;
private ZkClient zkClient;
private Object data;
public ZooKeeperRegistContext(String path, ZkClient zkClient, Object data) {
super();
this.path = path;
this.zkClient = zkClient;
this.data = data;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public ZkClient getZkClient() {
return zkClient;
}
public void setZkClient(ZkClient zkClient) {
this.zkClient = zkClient;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
/**
* 处理服务端与客户端之间的通信
*/
public class ServerHandler extends ChannelHandlerAdapter{
private final BalanceUpdateProvider balanceUpdater;
private static final Integer BALANCE_STEP = 1;
public ServerHandler(BalanceUpdateProvider balanceUpdater){
this.balanceUpdater = balanceUpdater;
}
public BalanceUpdateProvider getBalanceUpdater() {
return balanceUpdater;
}
// 建立连接时增加负载
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("one client connect...");
balanceUpdater.addBalance(BALANCE_STEP);
}
// 断开连接时减少负载
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
balanceUpdater.reduceBalance(BALANCE_STEP);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public interface BalanceUpdateProvider {
// 增加负载
public boolean addBalance(Integer step);
// 减少负载
public boolean reduceBalance(Integer step);
}
public class DefaultBalanceUpdateProvider implements BalanceUpdateProvider {
private String serverPath;
private ZkClient zc;
public DefaultBalanceUpdateProvider(String serverPath, ZkClient zkClient) {
this.serverPath = serverPath;
this.zc = zkClient;
}
public boolean addBalance(Integer step) {
Stat stat = new Stat();
ServerData sd;
// 增加负载:读取服务器的信息ServerData,增加负载,并写回zookeeper
while (true) {
try {
sd = zc.readData(this.serverPath, stat);
sd.setBalance(sd.getBalance() + step);
// 带上版本,因为可能有其他客户端连接到服务器修改了负载
zc.writeData(this.serverPath, sd, stat.getVersion());
return true;
} catch (ZkBadVersionException e) {
// ignore
} catch (Exception e) {
return false;
}
}
}
public boolean reduceBalance(Integer step) {
Stat stat = new Stat();
ServerData sd;
while (true) {
try {
sd = zc.readData(this.serverPath, stat);
final Integer currBalance = sd.getBalance();
sd.setBalance(currBalance>step?currBalance-step:0);
zc.writeData(this.serverPath, sd, stat.getVersion());
return true;
} catch (ZkBadVersionException e) {
// ignore
} catch (Exception e) {
return false;
}
}
}
}
/**
* 用于测试,负责启动Work Server
*/
public class ServerRunner {
private static final int SERVER_QTY = 2;
private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
private static final String SERVERS_PATH = "/servers";
public static void main(String[] args) {
List<Thread> threadList = new ArrayList<Thread>();
for(int i=0; i<SERVER_QTY; i++){
final Integer count = i;
Thread thread = new Thread(new Runnable() {
public void run() {
ServerData sd = new ServerData();
sd.setBalance(0);
sd.setHost("127.0.0.1");
sd.setPort(6000+count);
Server server = new ServerImpl(ZOOKEEPER_SERVER,SERVERS_PATH,sd);
server.bind();
}
});
threadList.add(thread);
thread.start();
}
for (int i=0; i<threadList.size(); i++){
try {
threadList.get(i).join();
} catch (InterruptedException ignore) {
//
}
}
}
}
再看Client端的代码:
public interface Client {
// 连接服务器
public void connect() throws Exception;
// 断开服务器
public void disConnect() throws Exception;
}
public class ClientImpl implements Client {
private final BalanceProvider<ServerData> provider;
private EventLoopGroup group = null;
private Channel channel = null;
private final Logger log = LoggerFactory.getLogger(getClass());
public ClientImpl(BalanceProvider<ServerData> provider) {
this.provider = provider;
}
public BalanceProvider<ServerData> getProvider() {
return provider;
}
public void connect(){
try{
ServerData serverData = provider.getBalanceItem(); // 获取负载最小的服务器信息,并与之建立连接
System.out.println("connecting to "+serverData.getHost()+":"+serverData.getPort()+", it's balance:"+serverData.getBalance());
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly();
channel = f.channel();
System.out.println("started success!");
}catch(Exception e){
System.out.println("连接异常:"+e.getMessage());
}
}
public void disConnect(){
try{
if (channel!=null)
channel.close().syncUninterruptibly();
group.shutdownGracefully();
group = null;
log.debug("disconnected!");
}catch(Exception e){
log.error(e.getMessage());
}
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
public interface BalanceProvider<T> {
public T getBalanceItem();
}
public abstract class AbstractBalanceProvider<T> implements BalanceProvider<T> {
protected abstract T balanceAlgorithm(List<T> items);
protected abstract List<T> getBalanceItems();
public T getBalanceItem(){
return balanceAlgorithm(getBalanceItems());
}
}
public class DefaultBalanceProvider extends AbstractBalanceProvider<ServerData> {
private final String zkServer; // zookeeper服务器地址
private final String serversPath; // servers节点路径
private final ZkClient zc;
private static final Integer SESSION_TIME_OUT = 10000;
private static final Integer CONNECT_TIME_OUT = 10000;
public DefaultBalanceProvider(String zkServer, String serversPath) {
this.serversPath = serversPath;
this.zkServer = zkServer;
this.zc = new ZkClient(this.zkServer, SESSION_TIME_OUT, CONNECT_TIME_OUT,
new SerializableSerializer());
}
@Override
protected ServerData balanceAlgorithm(List<ServerData> items) {
if (items.size()>0){
Collections.sort(items); // 根据负载由小到大排序
return items.get(0); // 返回负载最小的那个
}else{
return null;
}
}
/**
* 从zookeeper中拿到所有工作服务器的基本信息
*/
@Override
protected List<ServerData> getBalanceItems() {
List<ServerData> sdList = new ArrayList<ServerData>();
List<String> children = zc.getChildren(this.serversPath);
for(int i=0; i<children.size();i++){
ServerData sd = zc.readData(serversPath+"/"+children.get(i));
sdList.add(sd);
}
return sdList;
}
}
public class ClientRunner {
private static final int CLIENT_QTY = 3;
private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
private static final String SERVERS_PATH = "/servers";
public static void main(String[] args) {
List<Thread> threadList = new ArrayList<Thread>(CLIENT_QTY);
final List<Client> clientList = new ArrayList<Client>();
final BalanceProvider<ServerData> balanceProvider = new DefaultBalanceProvider(ZOOKEEPER_SERVER, SERVERS_PATH);
try{
for(int i=0; i<CLIENT_QTY; i++){
Thread thread = new Thread(new Runnable() {
public void run() {
Client client = new ClientImpl(balanceProvider);
clientList.add(client);
try {
client.connect();
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadList.add(thread);
thread.start();
//延时
Thread.sleep(2000);
}
System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}catch(Exception e){
e.printStackTrace();
}finally{
//关闭客户端
for (int i=0; i<clientList.size(); i++){
try {
clientList.get(i);
clientList.get(i).disConnect();
} catch (Exception ignore) {
//ignore
}
}
//关闭线程
for (int i=0; i<threadList.size(); i++){
threadList.get(i).interrupt();
try{
threadList.get(i).join();
}catch (InterruptedException e){
//ignore
}
}
}
}
}
我们先启动服务端ServerRunner
6000:binding...
6000:binded...
6001:binding...
6001:binded...
再来启动客户端ClientRunner
connecting to 127.0.0.1:6000, it's balance 0
started success!
connecting to 127.0.0.1:6001, it's balance 0
started success!
connecting to 127.0.0.1:6000, it's balance 1
started success!
敲回车退出!
4. 分布式锁
我们常说的锁是单进程多线程锁,在多线程并发编程中,用于线程之间的数据同步,保护共享资源的访问。而分布式锁,指在分布式环境下,保护跨进程、跨主机、跨网络的共享资源,实现互斥访问,保证一致性。
架构图
左侧是zookeeper集群,locker是数据节点,node_1到node_n代表一系列的顺序节点。
右侧client_1至client_n代表客户端,Service代表需要互斥访问的服务。
总实现思路,是在获取锁的时候在locker节点下创建顺序节点,在释放锁的时候,把自己创建的节点删除。
流程图
类图
代码实现
public interface DistributedLock {
/*
* 获取锁,如果没有得到就等待
*/
public void acquire() throws Exception;
/*
* 获取锁,直到超时
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/*
* 释放锁
*/
public void release() throws Exception;
}
public class SimpleDistributedLockMutex extends BaseDistributedLock implements
DistributedLock {
//锁名称前缀,成功创建的顺序节点如lock-0000000000,lock-0000000001,...
private static final String LOCK_NAME = "lock-";
// zookeeper中locker节点的路径
private final String basePath;
// 获取锁以后自己创建的那个顺序节点的路径
private String ourLockPath;
private boolean internalLock(long time, TimeUnit unit) throws Exception {
ourLockPath = attemptLock(time, unit);
return ourLockPath != null;
}
public SimpleDistributedLockMutex(ZkClientExt client, String basePath){
super(client,basePath,LOCK_NAME);
this.basePath = basePath;
}
// 获取锁
public void acquire() throws Exception {
if ( !internalLock(-1, null) ) {
throw new IOException("连接丢失!在路径:'"+basePath+"'下不能获取锁!");
}
}
// 获取锁,可以超时
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
// 释放锁
public void release() throws Exception {
releaseLock(ourLockPath);
}
}
public class BaseDistributedLock {
private final ZkClientExt client;
private final String path;
private final String basePath;
private final String lockName;
private static final Integer MAX_RETRY_COUNT = 10;
public BaseDistributedLock(ZkClientExt client, String path, String lockName){
this.client = client;
this.basePath = path;
this.path = path.concat("/").concat(lockName);
this.lockName = lockName;
}
// 删除成功获取锁之后所创建的那个顺序节点
private void deleteOurPath(String ourPath) throws Exception{
client.delete(ourPath);
}
// 创建临时顺序节点
private String createLockNode(ZkClient client, String path) throws Exception{
return client.createEphemeralSequential(path, null);
}
// 等待比自己次小的顺序节点的删除
private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{
boolean haveTheLock = false;
boolean doDelete = false;
try {
while ( !haveTheLock ) {
// 获取/locker下的经过排序的子节点列表
List<String> children = getSortedChildren();
// 获取刚才自己创建的那个顺序节点名
String sequenceNodeName = ourPath.substring(basePath.length()+1);
// 判断自己排第几个
int ourIndex = children.indexOf(sequenceNodeName);
if (ourIndex < 0){ // 网络抖动,获取到的子节点列表里可能已经没有自己了
throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
}
// 如果是第一个,代表自己已经获得了锁
boolean isGetTheLock = ourIndex == 0;
// 如果自己没有获得锁,则要watch比我们次小的那个节点
String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);
if ( isGetTheLock ){
haveTheLock = true;
} else {
// 订阅比自己次小顺序节点的删除事件
String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
final CountDownLatch latch = new CountDownLatch(1);
final IZkDataListener previousListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
latch.countDown(); // 删除后结束latch上的await
}
public void handleDataChange(String dataPath, Object data) throws Exception {
// ignore
}
};
try {
//订阅次小顺序节点的删除事件,如果节点不存在会出现异常
client.subscribeDataChanges(previousSequencePath, previousListener);
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ) {
doDelete = true; // timed out - delete our node
break;
}
latch.await(millisToWait, TimeUnit.MICROSECONDS); // 在latch上await
} else {
latch.await(); // 在latch上await
}
// 结束latch上的等待后,继续while重新来过判断自己是否第一个顺序节点
}
catch ( ZkNoNodeException e ) {
//ignore
} finally {
client.unsubscribeDataChanges(previousSequencePath, previousListener);
}
}
}
}
catch ( Exception e ) {
//发生异常需要删除节点
doDelete = true;
throw e;
} finally {
//如果需要删除节点
if ( doDelete ) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if ( index >= 0 ) {
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
// 获取/locker下的经过排序的子节点列表
List<String> getSortedChildren() throws Exception {
try{
List<String> children = client.getChildren(basePath);
Collections.sort(
children, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
}
}
);
return children;
} catch (ZkNoNodeException e){
client.createPersistent(basePath, true);
return getSortedChildren();
}
}
protected void releaseLock(String lockPath) throws Exception{
deleteOurPath(lockPath);
}
protected String attemptLock(long time, TimeUnit unit) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
//网络闪断需要重试一试
while ( !isDone ) {
isDone = true;
try {
// 在/locker下创建临时的顺序节点
ourPath = createLockNode(client, path);
// 判断自己是否获得了锁,如果没有获得那么等待直到获得锁或者超时
hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
} catch ( ZkNoNodeException e ) { // 捕获这个异常
if ( retryCount++ < MAX_RETRY_COUNT ) { // 重试指定次数
isDone = false;
} else {
throw e;
}
}
}
if ( hasTheLock ) {
return ourPath;
}
return null;
}
}
public class TestDistributedLock {
public static void main(String[] args) {
final ZkClientExt zkClientExt1 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());
final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(zkClientExt1, "/Mutex");
final ZkClientExt zkClientExt2 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());
final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(zkClientExt2, "/Mutex");
try {
mutex1.acquire();
System.out.println("Client1 locked");
Thread client2Thd = new Thread(new Runnable() {
public void run() {
try {
mutex2.acquire();
System.out.println("Client2 locked");
mutex2.release();
System.out.println("Client2 released lock");
} catch (Exception e) {
e.printStackTrace();
}
}
});
client2Thd.start();
Thread.sleep(5000);
mutex1.release();
System.out.println("Client1 released lock");
client2Thd.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ZkClientExt extends ZkClient {
public ZkClientExt(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
}
@Override
public void watchForData(final String path) {
retryUntilConnected(new Callable<Object>() {
public Object call() throws Exception {
Stat stat = new Stat();
_connection.readData(path, stat, true);
return null;
}
});
}
}
5. 分布式队列
在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。
分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。
zookeeper可以通过顺序节点实现分布式队列。
架构图
图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。
流程图
offer核心算法流程
poll核心算法流程
代码实现
/**
* 简单分布式队列
*/
public class DistributedSimpleQueue<T> {
protected final ZkClient zkClient;
// queue节点
protected final String root;
// 顺序节点前缀
protected static final String Node_NAME = "n_";
public DistributedSimpleQueue(ZkClient zkClient, String root) {
this.zkClient = zkClient;
this.root = root;
}
// 判断队列大小
public int size() {
return zkClient.getChildren(root).size();
}
// 判断队列是否为空
public boolean isEmpty() {
return zkClient.getChildren(root).size() == 0;
}
// 向队列提供数据
public boolean offer(T element) throws Exception{
// 创建顺序节点
String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
try {
zkClient.createPersistentSequential(nodeFullPath , element);
}catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
offer(element);
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
return true;
}
// 从队列取数据
public T poll() throws Exception {
try {
// 获取所有顺序节点
List<String> list = zkClient.getChildren(root);
if (list.size() == 0) {
return null;
}
// 排序
Collections.sort(list, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
// 循环每个顺序节点名
for ( String nodeName : list ){
// 构造出顺序节点的完整路径
String nodeFullPath = root.concat("/").concat(nodeName);
try {
// 读取顺序节点的内容
T node = (T) zkClient.readData(nodeFullPath);
// 删除顺序节点
zkClient.delete(nodeFullPath);
return node;
} catch (ZkNoNodeException e) {
// ignore 由其他客户端把这个顺序节点消费掉了
}
}
return null;
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
}
private String getNodeNumber(String str, String nodeName) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += Node_NAME.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
public class User implements Serializable {
String name;
String id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
public class TestDistributedSimpleQueue {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");
User user1 = new User();
user1.setId("1");
user1.setName("xiao wang");
User user2 = new User();
user2.setId("2");
user2.setName("xiao wang");
try {
queue.offer(user1);
queue.offer(user2);
User u1 = (User) queue.poll();
User u2 = (User) queue.poll();
if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
System.out.println("Success!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
上面实现了一个简单分布式队列,在此基础上,我们再扩展一个阻塞分布式队列。代码如下:
/**
* 阻塞分布式队列
* 扩展自简单分布式队列,在拿不到队列数据时,进行阻塞直到拿到数据
*/
public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{
public DistributedBlockingQueue(ZkClient zkClient, String root) {
super(zkClient, root);
}
@Override
public T poll() throws Exception {
while (true){ // 结束在latch上的等待后,再来一次
final CountDownLatch latch = new CountDownLatch(1);
final IZkChildListener childListener = new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
latch.countDown(); // 队列有变化,结束latch上的等待
}
};
zkClient.subscribeChildChanges(root, childListener);
try{
T node = super.poll(); // 获取队列数据
if ( node != null ){
return node;
} else {
latch.await(); // 拿不到队列数据,则在latch上await
}
} finally {
zkClient.unsubscribeChildChanges(root, childListener);
}
}
}
}
public class TestDistributedBlockingQueue {
public static void main(String[] args) {
ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
int delayTime = 5;
ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue");
final User user1 = new User();
user1.setId("1");
user1.setName("xiao wang");
final User user2 = new User();
user2.setId("2");
user2.setName("xiao wang");
try {
delayExector.schedule(new Runnable() {
public void run() {
try {
queue.offer(user1);
queue.offer(user2);
} catch (Exception e) {
e.printStackTrace();
}
}
}, delayTime , TimeUnit.SECONDS);
System.out.println("ready poll!");
User u1 = (User) queue.poll();
User u2 = (User) queue.poll();
if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
System.out.println("Success!");
}
} catch (Exception e) {
e.printStackTrace();
} finally{
delayExector.shutdown();
try {
delayExector.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
}
6. 分布式命名服务
zookeeper的命名服务有两个应用方向,一个是提供类似JNDI的功能,利用zookeepeer的树型分层结构,可以把系统中各种服务的名称、地址以及目录信息存放在zookeeper,需要的时候去zookeeper中读取。
另一个,是利用zookeeper顺序节点的特性,制作分布式的ID生成器,写过数据库应用的朋友都知道,我们在往数据库表中插入记录时,通常需要为该记录创建唯一的ID,在单机环境中我们可以利用数据库的主键自增功能。但在分布式环境则无法使用,有一种方式可以使用UUID,但是它的缺陷是没有规律,很难理解。利用zookeeper顺序节点的特性,我们可以生成有顺序的,容易理解的,同时支持分布式环境的序列号。
代码实现
public class IdMaker {
private ZkClient client = null;
private final String server;
// zookeeper顺序节点的父节点
private final String root;
// 顺序节点的名称
private final String nodeName;
// 标识当前服务是否正在运行
private volatile boolean running = false;
private ExecutorService cleanExector = null;
public enum RemoveMethod{
NONE,IMMEDIATELY,DELAY
}
public IdMaker(String zkServer,String root,String nodeName){
this.root = root;
this.server = zkServer;
this.nodeName = nodeName;
}
// 启动服务
public void start() throws Exception {
if (running)
throw new Exception("server has stated...");
running = true;
init();
}
// 停止服务
public void stop() throws Exception {
if (!running)
throw new Exception("server has stopped...");
running = false;
freeResource();
}
// 初始化服务资源
private void init(){
client = new ZkClient(server,5000,5000,new BytesPushThroughSerializer());
cleanExector = Executors.newFixedThreadPool(10);
try{
client.createPersistent(root,true);
}catch (ZkNodeExistsException e){
//ignore;
}
}
// 释放服务器资源
private void freeResource(){
// 释放线程池
cleanExector.shutdown();
try{
cleanExector.awaitTermination(2, TimeUnit.SECONDS);
}catch(InterruptedException e){
e.printStackTrace();
}finally{
cleanExector = null;
}
if (client!=null){
client.close();
client=null;
}
}
// 检测当前服务是否正在运行
private void checkRunning() throws Exception {
if (!running)
throw new Exception("请先调用start");
}
// 从顺序节点名中提取我们要的ID值
private String ExtractId(String str){
int index = str.lastIndexOf(nodeName);
if (index >= 0){
index+=nodeName.length();
return index <= str.length()?str.substring(index):"";
}
return str;
}
// 生成ID
public String generateId(RemoveMethod removeMethod) throws Exception{
checkRunning();
// 构造顺序节点的完整路径
final String fullNodePath = root.concat("/").concat(nodeName);
// 创建持久化顺序节点
final String ourPath = client.createPersistentSequential(fullNodePath, null);
// 避免zookeeper的顺序节点暴增,直接删除掉刚创建的顺序节点
if (removeMethod.equals(RemoveMethod.IMMEDIATELY)){ // 立即删除
client.delete(ourPath);
}else if (removeMethod.equals(RemoveMethod.DELAY)){ // 延迟删除
cleanExector.execute(new Runnable() { // 用线程池执行删除,让generateId()方法尽快返回
public void run() {
client.delete(ourPath);
}
});
}
//node-0000000000, node-0000000001
return ExtractId(ourPath);
}
}
public class TestIdMaker {
public static void main(String[] args) throws Exception {
IdMaker idMaker = new IdMaker("192.168.1.105:2181",
"/NameService/IdGen", "ID");
idMaker.start();
try {
for (int i = 0; i < 10; i++) {
String id = idMaker.generateId(RemoveMethod.DELAY);
System.out.println(id);
}
} finally {
idMaker.stop();
}
}
}