1. master选举
考虑7*24小时向外提供服务的系统,不能有单点故障,于是我们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程中,从备机选出一台机作为主机的过程,就是Master选举。
架构图

左边是ZooKeeper集群,右边是3台工作服务器。工作服务器启动时,会去ZooKeeper的Servers节点下创建临时节点,并把基本信息写入临时节点。这个过程叫服务注册,系统中的其他服务可以通过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫做服务发现。接着这些服务器会尝试创建Master临时节点,谁创建成功谁就是Master,其他的两台就作为Slave。所有的Work Server必需关注Master节点的删除事件。通过监听Master节点的删除事件,来了解Master服务器是否宕机(创建临时节点的服务器一旦宕机,它所创建的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。
流程图

核心类图

WorkServer对应架构图的WorkServer,是主工作类;
RunningData用来描述WorkServer的基本信息;
LeaderSelectorZkClient作为调度器来启动和停止WorkServer;
代码实现
/*** 工作服务器信息*/public class RunningData implements Serializable {private static final long serialVersionUID = 4260577459043203630L;private Long cid;private String name;public Long getCid() {return cid;}public void setCid(Long cid) {this.cid = cid;}public String getName() {return name;}public void setName(String name) {this.name = name;}}
/*** 工作服务器*/public class WorkServer {// 记录服务器状态private volatile boolean running = false;private ZkClient zkClient;// Master节点对应zookeeper中的节点路径private static final String MASTER_PATH = "/master";// 监听Master节点删除事件private IZkDataListener dataListener;// 记录当前节点的基本信息private RunningData serverData;// 记录集群中Master节点的基本信息private RunningData masterData;private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);private int delayTime = 5;public WorkServer(RunningData rd) {this.serverData = rd; // 记录服务器基本信息this.dataListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {//takeMaster();if (masterData != null && masterData.getName().equals(serverData.getName())){// 自己就是上一轮的Master服务器,则直接抢takeMaster();} else {// 否则,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免不必要的数据迁移开销delayExector.schedule(new Runnable(){public void run(){takeMaster();}}, delayTime, TimeUnit.SECONDS);}}public void handleDataChange(String dataPath, Object data)throws Exception {}};}public ZkClient getZkClient() {return zkClient;}public void setZkClient(ZkClient zkClient) {this.zkClient = zkClient;}// 启动服务器public void start() throws Exception {if (running) {throw new Exception("server has startup...");}running = true;// 订阅Master节点删除事件zkClient.subscribeDataChanges(MASTER_PATH, dataListener);// 争抢Master权利takeMaster();}// 停止服务器public void stop() throws Exception {if (!running) {throw new Exception("server has stoped");}running = false;delayExector.shutdown();// 取消Master节点事件订阅zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);// 释放Master权利releaseMaster();}// 争抢Masterprivate void takeMaster() {if (!running)return;try {// 尝试创建Master临时节点zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);masterData = serverData;System.out.println(serverData.getName()+" is master");// 作为演示,我们让服务器每隔5秒释放一次Master权利delayExector.schedule(new Runnable() {public void run() {// TODO Auto-generated method stubif (checkMaster()){releaseMaster();}}}, 5, TimeUnit.SECONDS);} catch (ZkNodeExistsException e) { // 已被其他服务器创建了// 读取Master节点信息RunningData runningData = zkClient.readData(MASTER_PATH, true);if (runningData == null) {takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢} else {masterData = runningData;}} catch (Exception e) {// ignore;}}// 释放Master权利private void releaseMaster() {if (checkMaster()) {zkClient.delete(MASTER_PATH);}}// 检测自己是否为Masterprivate boolean checkMaster() {try {RunningData eventData = zkClient.readData(MASTER_PATH);masterData = eventData;if (masterData.getName().equals(serverData.getName())) {return true;}return false;} catch (ZkNoNodeException e) {return false; // 节点不存在,自己肯定不是Master了} catch (ZkInterruptedException e) {return checkMaster();} catch (ZkException e) {return false;}}}
/*** 调度器*/public class LeaderSelectorZkClient {//启动的服务个数private static final int CLIENT_QTY = 10;//zookeeper服务器的地址private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";public static void main(String[] args) throws Exception {//保存所有zkClient的列表List<ZkClient> clients = new ArrayList<ZkClient>();//保存所有服务的列表List<WorkServer> workServers = new ArrayList<WorkServer>();try {for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟创建10个服务器并启动//创建zkClientZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());clients.add(client);//创建serverDataRunningData runningData = new RunningData();runningData.setCid(Long.valueOf(i));runningData.setName("Client #" + i);//创建服务WorkServer workServer = new WorkServer(runningData);workServer.setZkClient(client);workServers.add(workServer);workServer.start();}System.out.println("敲回车键退出!\n");new BufferedReader(new InputStreamReader(System.in)).readLine();} finally {System.out.println("Shutting down...");for ( WorkServer workServer : workServers ) {try {workServer.stop();} catch (Exception e) {e.printStackTrace();}}for ( ZkClient client : clients ) {try {client.close();} catch (Exception e) {e.printStackTrace();}}}}}
2. 数据发布订阅
多个订阅者对象同时监听同一主题对象,主题对象状态变化时通知所有订阅者对象更新自身状态。发布方和订阅方独立封装、独立改变,当一个对象的改变需要同时改变其他对象,并且它不知道有多少个对象需要改变时,可以使用发布订阅模式。
在分布式系统中的顶级应用有配置管理和服务发现。
配置管理:指集群中的机器拥有某些配置,并且这些配置信息需要动态地改变,那么我们就可以使用发布订阅模式把配置做统一的管理,让这些机器订阅配置信息的改变,但是配置改变时这些机器得到通知并更新自己的配置。
服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。
架构图

左边代表Zookeeper集群,右侧代表服务器集群。其中前3个为工作服务器Work Server,绿色为管理服务器Manage Server,最下面的是控制服务器Control Server。
config节点,用于配置管理。Manage Server通过config节点下发配置信息,Work Server可以通过订阅config节点的改变来更新自己的配置。
Servers节点,用于服务发现,每个Work Server在启动时都会在Servers节点下创建一个临时节点,Manage Server充当monitor,通过监听Servers节点的子节点列表的变化来更新自己内存中工作服务器列表的信息。
通过Control Server由Command节点作为中介,向Manage Server发送控制指令。Control Server向command节点写入命令信息,Manage Server订阅command节点的数据改变来监听并执行命令。
流程图
Manage Server程序主体流程

核心类图

WorkServer对应架构图的Work Server;
ManageServer对应架构图的Manage Server;
ServerConfig用于记录Work Server的配置信息;
ServerData用于记录Work Server的基本信息;
SubscribeZkClient作为示例程序入口服务站启动Work Server和Manage Server
实现代码
/*** 配置信息*/public class ServerConfig {private String dbUrl;private String dbPwd;private String dbUser;public String getDbUrl() {return dbUrl;}public void setDbUrl(String dbUrl) {this.dbUrl = dbUrl;}public String getDbPwd() {return dbPwd;}public void setDbPwd(String dbPwd) {this.dbPwd = dbPwd;}public String getDbUser() {return dbUser;}public void setDbUser(String dbUser) {this.dbUser = dbUser;}@Overridepublic String toString() {return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd+ ", dbUser=" + dbUser + "]";}}
/*** 服务器基本信息*/public class ServerData {private String address;private Integer id;private String name;public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "ServerData [address=" + address + ", id=" + id + ", name="+ name + "]";}}
/*** 代表工作服务器*/public class WorkServer {private ZkClient zkClient;// ZooKeeperprivate String configPath;// ZooKeeper集群中servers节点的路径private String serversPath;// 当前工作服务器的基本信息private ServerData serverData;// 当前工作服务器的配置信息private ServerConfig serverConfig;private IZkDataListener dataListener;public WorkServer(String configPath, String serversPath,ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {this.zkClient = zkClient;this.serversPath = serversPath;this.configPath = configPath;this.serverConfig = initConfig;this.serverData = serverData;this.dataListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {}public void handleDataChange(String dataPath, Object data)throws Exception {String retJson = new String((byte[])data);ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);updateConfig(serverConfigLocal);System.out.println("new Work server config is:"+serverConfig.toString());}};}// 启动服务器public void start() {System.out.println("work server start...");initRunning();}// 停止服务器public void stop() {System.out.println("work server stop...");zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点}// 服务器初始化private void initRunning() {registMe(); // 注册自己zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件}// 启动时向zookeeper注册自己的注册函数private void registMe() {String mePath = serversPath.concat("/").concat(serverData.getAddress());try {zkClient.createEphemeral(mePath, JSON.toJSONString(serverData).getBytes());} catch (ZkNoNodeException e) {zkClient.createPersistent(serversPath, true);registMe();}}// 更新自己的配置信息private void updateConfig(ServerConfig serverConfig) {this.serverConfig = serverConfig;}}
public class ManageServer {// zookeeper的servers节点路径private String serversPath;// zookeeper的command节点路径private String commandPath;// zookeeper的config节点路径private String configPath;private ZkClient zkClient;private ServerConfig config;// 用于监听servers节点的子节点列表的变化private IZkChildListener childListener;// 用于监听command节点数据内容的变化private IZkDataListener dataListener;// 工作服务器的列表private List<String> workServerList;public ManageServer(String serversPath, String commandPath,String configPath, ZkClient zkClient, ServerConfig config) {this.serversPath = serversPath;this.commandPath = commandPath;this.zkClient = zkClient;this.config = config;this.configPath = configPath;this.childListener = new IZkChildListener() {public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception {// TODO Auto-generated method stubworkServerList = currentChilds; // 更新内存中工作服务器列表System.out.println("work server list changed, new list is ");execList();}};this.dataListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {// TODO Auto-generated method stub// ignore;}public void handleDataChange(String dataPath, Object data)throws Exception {// TODO Auto-generated method stubString cmd = new String((byte[]) data);System.out.println("cmd:"+cmd);exeCmd(cmd); // 执行命令}};}private void initRunning() {zkClient.subscribeDataChanges(commandPath, dataListener);zkClient.subscribeChildChanges(serversPath, childListener);}/** 1: list 2: create 3: modify*/private void exeCmd(String cmdType) {if ("list".equals(cmdType)) {execList();} else if ("create".equals(cmdType)) {execCreate();} else if ("modify".equals(cmdType)) {execModify();} else {System.out.println("error command!" + cmdType);}}// 列出工作服务器列表private void execList() {System.out.println(workServerList.toString());}// 创建config节点private void execCreate() {if (!zkClient.exists(configPath)) {try {zkClient.createPersistent(configPath, JSON.toJSONString(config).getBytes());} catch (ZkNodeExistsException e) {zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); // config节点已经存在,则写入内容就可以了} catch (ZkNoNodeException e) {String parentDir = configPath.substring(0,configPath.lastIndexOf('/'));zkClient.createPersistent(parentDir, true);execCreate();}}}// 修改config节点内容private void execModify() {// 我们随意修改config的一个属性就可以了config.setDbUser(config.getDbUser() + "_modify");try {zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());} catch (ZkNoNodeException e) {execCreate(); // 写入时config节点还未存在,则创建它}}// 启动工作服务器public void start() {initRunning();}// 停止工作服务器public void stop() {zkClient.unsubscribeChildChanges(serversPath, childListener);zkClient.unsubscribeDataChanges(commandPath, dataListener);}}
/*** 调度类*/public class SubscribeZkClient {private static final int CLIENT_QTY = 5; // Work Server数量private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";private static final String CONFIG_PATH = "/config";private static final String COMMAND_PATH = "/command";private static final String SERVERS_PATH = "/servers";public static void main(String[] args) throws Exception {List<ZkClient> clients = new ArrayList<ZkClient>();List<WorkServer> workServers = new ArrayList<WorkServer>();ManageServer manageServer = null;try {// 创建一个默认的配置ServerConfig initConfig = new ServerConfig();initConfig.setDbPwd("123456");initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");initConfig.setDbUser("root");// 实例化一个Manage ServerZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);manageServer.start(); // 启动Manage Server// 创建指定个数的工作服务器for ( int i = 0; i < CLIENT_QTY; ++i ) {ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());clients.add(client);ServerData serverData = new ServerData();serverData.setId(i);serverData.setName("WorkServer#"+i);serverData.setAddress("192.168.1."+i);WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);workServers.add(workServer);workServer.start(); // 启动工作服务器}System.out.println("敲回车键退出!\n");new BufferedReader(new InputStreamReader(System.in)).readLine();} finally {System.out.println("Shutting down...");for ( WorkServer workServer : workServers ) {try {workServer.stop();} catch (Exception e) {e.printStackTrace();}}for ( ZkClient client : clients ) {try {client.close();} catch (Exception e) {e.printStackTrace();}}}}}
我们用zookeeper的命令行客户端向Manage Server下达指令。
执行zkCli命令:
create /command list
java控制台输出:
cmd:list[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]
执行zkCli命令:
set /command create
java控制台输出:
cmd:createnew Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
执行zkCli命令:
set /command modify
java控制台输出:
cmd:modifynew Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modi
3. 负载均衡
负载均衡是一种手段,用来把对某种资源的访问分摊给不同的设备,从而减轻单点的压力。
架构图

图中左侧为ZooKeeper集群,右侧上方为工作服务器,下面为客户端。每台工作服务器在启动时都会去zookeeper的servers节点下注册临时节点,每台客户端在启动时都会去servers节点下取得所有可用的工作服务器列表,并通过一定的负载均衡算法计算得出一台工作服务器,并与之建立网络连接。网络连接我们采用开源框架netty。
流程图
负载均衡客户端流程

服务端主体流程

类图
Server端核心类

每个服务端对应一个Server接口,ServiceImpl是服务端的实现类。把服务端启动时的注册过程抽出为一个接口RegistProvider,并给予一个默认实现DefaultRegistProvider,它将用到一个上下文的类ZooKeeperRegistContext。我们的服务端是给予Netty的,它需要ServerHandler来处理与客户端之间的连接,当有客户端建立或失去连接时,我们都需要去修改当前服务器的负载信息,我们把修改负载信息的过程也抽出为一个接口BalanceUpdateProvider,并且给予了一个默认实现DefaultBalanceUpdateProvider。ServerRunner是调度类,负责调度我们的Server。
Client端核心类

每个客户端都需要实现一个Client接口,ClientImpl是实现,Client需要ClientHandler来处理与服务器之前的通讯,同时它需要BalanceProvider为它提供负载均衡的算法。BalanceProvider是接口,它有2个实现类,一个是抽象的实现AbstractBalanceProvider,一个是默认的实现DefaultBalanceProvider。ServerData是服务端和客户端共用的一个类,服务端会把自己的基本信息,包括负载信息,打包成ServerData并写入到zookeeper中,客户端在计算负载的时候需要到zookeeper中拿到ServerData,并取得服务端的地址和负载信息。ClientRunner是客户端的调度类,负责启动客户端。
代码实现
先是Server端的代码:
public class ServerData implements Serializable,Comparable<ServerData> {private static final long serialVersionUID = -8892569870391530906L;private Integer balance;private String host;private Integer port;public Integer getBalance() {return balance;}public void setBalance(Integer balance) {this.balance = balance;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}@Overridepublic String toString() {return "ServerData [balance=" + balance + ", host=" + host + ", port="+ port + "]";}public int compareTo(ServerData o) {return this.getBalance().compareTo(o.getBalance());}}
public interface Server {public void bind();}
public class ServerImpl implements Server {private EventLoopGroup bossGroup = new NioEventLoopGroup();private EventLoopGroup workGroup = new NioEventLoopGroup();private ServerBootstrap bootStrap = new ServerBootstrap();private ChannelFuture cf;private String zkAddress;private String serversPath;private String currentServerPath;private ServerData sd;private volatile boolean binded = false;private final ZkClient zc;private final RegistProvider registProvider;private static final Integer SESSION_TIME_OUT = 10000;private static final Integer CONNECT_TIME_OUT = 10000;public String getCurrentServerPath() {return currentServerPath;}public String getZkAddress() {return zkAddress;}public String getServersPath() {return serversPath;}public ServerData getSd() {return sd;}public void setSd(ServerData sd) {this.sd = sd;}public ServerImpl(String zkAddress, String serversPath, ServerData sd){this.zkAddress = zkAddress;this.serversPath = serversPath;this.zc = new ZkClient(this.zkAddress,SESSION_TIME_OUT,CONNECT_TIME_OUT,new SerializableSerializer());this.registProvider = new DefaultRegistProvider();this.sd = sd;}//初始化服务端private void initRunning() throws Exception {String mePath = serversPath.concat("/").concat(sd.getPort().toString());//注册到zookeeperregistProvider.regist(new ZooKeeperRegistContext(mePath,zc,sd));currentServerPath = mePath;}public void bind() {if (binded){return;}System.out.println(sd.getPort()+":binding...");try {initRunning();} catch (Exception e) {e.printStackTrace();return;}bootStrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new ServerHandler(new DefaultBalanceUpdateProvider(currentServerPath,zc)));}});try {cf = bootStrap.bind(sd.getPort()).sync();binded = true;System.out.println(sd.getPort()+":binded...");cf.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally{bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}
public interface RegistProvider {public void regist(Object context) throws Exception;public void unRegist(Object context) throws Exception;}
public class DefaultRegistProvider implements RegistProvider {// 在zookeeper中创建临时节点并写入信息public void regist(Object context) throws Exception {// Server在zookeeper中注册自己,需要在zookeeper的目标节点上创建临时节点并写入自己// 将需要的以下3个信息包装成上下文传入// 1:path// 2:zkClient// 3:serverDataZooKeeperRegistContext registContext = (ZooKeeperRegistContext) context;String path = registContext.getPath();ZkClient zc = registContext.getZkClient();try {zc.createEphemeral(path, registContext.getData());} catch (ZkNoNodeException e) {String parentDir = path.substring(0, path.lastIndexOf('/'));zc.createPersistent(parentDir, true);regist(registContext);}}public void unRegist(Object context) throws Exception {return;}}
public class ZooKeeperRegistContext {private String path;private ZkClient zkClient;private Object data;public ZooKeeperRegistContext(String path, ZkClient zkClient, Object data) {super();this.path = path;this.zkClient = zkClient;this.data = data;}public String getPath() {return path;}public void setPath(String path) {this.path = path;}public ZkClient getZkClient() {return zkClient;}public void setZkClient(ZkClient zkClient) {this.zkClient = zkClient;}public Object getData() {return data;}public void setData(Object data) {this.data = data;}}
/*** 处理服务端与客户端之间的通信*/public class ServerHandler extends ChannelHandlerAdapter{private final BalanceUpdateProvider balanceUpdater;private static final Integer BALANCE_STEP = 1;public ServerHandler(BalanceUpdateProvider balanceUpdater){this.balanceUpdater = balanceUpdater;}public BalanceUpdateProvider getBalanceUpdater() {return balanceUpdater;}// 建立连接时增加负载@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("one client connect...");balanceUpdater.addBalance(BALANCE_STEP);}// 断开连接时减少负载@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {balanceUpdater.reduceBalance(BALANCE_STEP);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}
public interface BalanceUpdateProvider {// 增加负载public boolean addBalance(Integer step);// 减少负载public boolean reduceBalance(Integer step);}
public class DefaultBalanceUpdateProvider implements BalanceUpdateProvider {private String serverPath;private ZkClient zc;public DefaultBalanceUpdateProvider(String serverPath, ZkClient zkClient) {this.serverPath = serverPath;this.zc = zkClient;}public boolean addBalance(Integer step) {Stat stat = new Stat();ServerData sd;// 增加负载:读取服务器的信息ServerData,增加负载,并写回zookeeperwhile (true) {try {sd = zc.readData(this.serverPath, stat);sd.setBalance(sd.getBalance() + step);// 带上版本,因为可能有其他客户端连接到服务器修改了负载zc.writeData(this.serverPath, sd, stat.getVersion());return true;} catch (ZkBadVersionException e) {// ignore} catch (Exception e) {return false;}}}public boolean reduceBalance(Integer step) {Stat stat = new Stat();ServerData sd;while (true) {try {sd = zc.readData(this.serverPath, stat);final Integer currBalance = sd.getBalance();sd.setBalance(currBalance>step?currBalance-step:0);zc.writeData(this.serverPath, sd, stat.getVersion());return true;} catch (ZkBadVersionException e) {// ignore} catch (Exception e) {return false;}}}}
/*** 用于测试,负责启动Work Server*/public class ServerRunner {private static final int SERVER_QTY = 2;private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";private static final String SERVERS_PATH = "/servers";public static void main(String[] args) {List<Thread> threadList = new ArrayList<Thread>();for(int i=0; i<SERVER_QTY; i++){final Integer count = i;Thread thread = new Thread(new Runnable() {public void run() {ServerData sd = new ServerData();sd.setBalance(0);sd.setHost("127.0.0.1");sd.setPort(6000+count);Server server = new ServerImpl(ZOOKEEPER_SERVER,SERVERS_PATH,sd);server.bind();}});threadList.add(thread);thread.start();}for (int i=0; i<threadList.size(); i++){try {threadList.get(i).join();} catch (InterruptedException ignore) {//}}}}
再看Client端的代码:
public interface Client {// 连接服务器public void connect() throws Exception;// 断开服务器public void disConnect() throws Exception;}
public class ClientImpl implements Client {private final BalanceProvider<ServerData> provider;private EventLoopGroup group = null;private Channel channel = null;private final Logger log = LoggerFactory.getLogger(getClass());public ClientImpl(BalanceProvider<ServerData> provider) {this.provider = provider;}public BalanceProvider<ServerData> getProvider() {return provider;}public void connect(){try{ServerData serverData = provider.getBalanceItem(); // 获取负载最小的服务器信息,并与之建立连接System.out.println("connecting to "+serverData.getHost()+":"+serverData.getPort()+", it's balance:"+serverData.getBalance());group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new ClientHandler());}});ChannelFuture f = b.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly();channel = f.channel();System.out.println("started success!");}catch(Exception e){System.out.println("连接异常:"+e.getMessage());}}public void disConnect(){try{if (channel!=null)channel.close().syncUninterruptibly();group.shutdownGracefully();group = null;log.debug("disconnected!");}catch(Exception e){log.error(e.getMessage());}}}
public class ClientHandler extends ChannelHandlerAdapter {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}}
public interface BalanceProvider<T> {public T getBalanceItem();}
public abstract class AbstractBalanceProvider<T> implements BalanceProvider<T> {protected abstract T balanceAlgorithm(List<T> items);protected abstract List<T> getBalanceItems();public T getBalanceItem(){return balanceAlgorithm(getBalanceItems());}}
public class DefaultBalanceProvider extends AbstractBalanceProvider<ServerData> {private final String zkServer; // zookeeper服务器地址private final String serversPath; // servers节点路径private final ZkClient zc;private static final Integer SESSION_TIME_OUT = 10000;private static final Integer CONNECT_TIME_OUT = 10000;public DefaultBalanceProvider(String zkServer, String serversPath) {this.serversPath = serversPath;this.zkServer = zkServer;this.zc = new ZkClient(this.zkServer, SESSION_TIME_OUT, CONNECT_TIME_OUT,new SerializableSerializer());}@Overrideprotected ServerData balanceAlgorithm(List<ServerData> items) {if (items.size()>0){Collections.sort(items); // 根据负载由小到大排序return items.get(0); // 返回负载最小的那个}else{return null;}}/*** 从zookeeper中拿到所有工作服务器的基本信息*/@Overrideprotected List<ServerData> getBalanceItems() {List<ServerData> sdList = new ArrayList<ServerData>();List<String> children = zc.getChildren(this.serversPath);for(int i=0; i<children.size();i++){ServerData sd = zc.readData(serversPath+"/"+children.get(i));sdList.add(sd);}return sdList;}}
public class ClientRunner {private static final int CLIENT_QTY = 3;private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";private static final String SERVERS_PATH = "/servers";public static void main(String[] args) {List<Thread> threadList = new ArrayList<Thread>(CLIENT_QTY);final List<Client> clientList = new ArrayList<Client>();final BalanceProvider<ServerData> balanceProvider = new DefaultBalanceProvider(ZOOKEEPER_SERVER, SERVERS_PATH);try{for(int i=0; i<CLIENT_QTY; i++){Thread thread = new Thread(new Runnable() {public void run() {Client client = new ClientImpl(balanceProvider);clientList.add(client);try {client.connect();} catch (Exception e) {e.printStackTrace();}}});threadList.add(thread);thread.start();//延时Thread.sleep(2000);}System.out.println("敲回车键退出!\n");new BufferedReader(new InputStreamReader(System.in)).readLine();}catch(Exception e){e.printStackTrace();}finally{//关闭客户端for (int i=0; i<clientList.size(); i++){try {clientList.get(i);clientList.get(i).disConnect();} catch (Exception ignore) {//ignore}}//关闭线程for (int i=0; i<threadList.size(); i++){threadList.get(i).interrupt();try{threadList.get(i).join();}catch (InterruptedException e){//ignore}}}}}
我们先启动服务端ServerRunner
6000:binding...6000:binded...6001:binding...6001:binded...
再来启动客户端ClientRunner
connecting to 127.0.0.1:6000, it's balance 0started success!connecting to 127.0.0.1:6001, it's balance 0started success!connecting to 127.0.0.1:6000, it's balance 1started success!敲回车退出!
4. 分布式锁
我们常说的锁是单进程多线程锁,在多线程并发编程中,用于线程之间的数据同步,保护共享资源的访问。而分布式锁,指在分布式环境下,保护跨进程、跨主机、跨网络的共享资源,实现互斥访问,保证一致性。
架构图

左侧是zookeeper集群,locker是数据节点,node_1到node_n代表一系列的顺序节点。
右侧client_1至client_n代表客户端,Service代表需要互斥访问的服务。
总实现思路,是在获取锁的时候在locker节点下创建顺序节点,在释放锁的时候,把自己创建的节点删除。
流程图

类图
代码实现
public interface DistributedLock {/** 获取锁,如果没有得到就等待*/public void acquire() throws Exception;/** 获取锁,直到超时*/public boolean acquire(long time, TimeUnit unit) throws Exception;/** 释放锁*/public void release() throws Exception;}
public class SimpleDistributedLockMutex extends BaseDistributedLock implementsDistributedLock {//锁名称前缀,成功创建的顺序节点如lock-0000000000,lock-0000000001,...private static final String LOCK_NAME = "lock-";// zookeeper中locker节点的路径private final String basePath;// 获取锁以后自己创建的那个顺序节点的路径private String ourLockPath;private boolean internalLock(long time, TimeUnit unit) throws Exception {ourLockPath = attemptLock(time, unit);return ourLockPath != null;}public SimpleDistributedLockMutex(ZkClientExt client, String basePath){super(client,basePath,LOCK_NAME);this.basePath = basePath;}// 获取锁public void acquire() throws Exception {if ( !internalLock(-1, null) ) {throw new IOException("连接丢失!在路径:'"+basePath+"'下不能获取锁!");}}// 获取锁,可以超时public boolean acquire(long time, TimeUnit unit) throws Exception {return internalLock(time, unit);}// 释放锁public void release() throws Exception {releaseLock(ourLockPath);}}
public class BaseDistributedLock {private final ZkClientExt client;private final String path;private final String basePath;private final String lockName;private static final Integer MAX_RETRY_COUNT = 10;public BaseDistributedLock(ZkClientExt client, String path, String lockName){this.client = client;this.basePath = path;this.path = path.concat("/").concat(lockName);this.lockName = lockName;}// 删除成功获取锁之后所创建的那个顺序节点private void deleteOurPath(String ourPath) throws Exception{client.delete(ourPath);}// 创建临时顺序节点private String createLockNode(ZkClient client, String path) throws Exception{return client.createEphemeralSequential(path, null);}// 等待比自己次小的顺序节点的删除private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{boolean haveTheLock = false;boolean doDelete = false;try {while ( !haveTheLock ) {// 获取/locker下的经过排序的子节点列表List<String> children = getSortedChildren();// 获取刚才自己创建的那个顺序节点名String sequenceNodeName = ourPath.substring(basePath.length()+1);// 判断自己排第几个int ourIndex = children.indexOf(sequenceNodeName);if (ourIndex < 0){ // 网络抖动,获取到的子节点列表里可能已经没有自己了throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);}// 如果是第一个,代表自己已经获得了锁boolean isGetTheLock = ourIndex == 0;// 如果自己没有获得锁,则要watch比我们次小的那个节点String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);if ( isGetTheLock ){haveTheLock = true;} else {// 订阅比自己次小顺序节点的删除事件String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );final CountDownLatch latch = new CountDownLatch(1);final IZkDataListener previousListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {latch.countDown(); // 删除后结束latch上的await}public void handleDataChange(String dataPath, Object data) throws Exception {// ignore}};try {//订阅次小顺序节点的删除事件,如果节点不存在会出现异常client.subscribeDataChanges(previousSequencePath, previousListener);if ( millisToWait != null ) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if ( millisToWait <= 0 ) {doDelete = true; // timed out - delete our nodebreak;}latch.await(millisToWait, TimeUnit.MICROSECONDS); // 在latch上await} else {latch.await(); // 在latch上await}// 结束latch上的等待后,继续while重新来过判断自己是否第一个顺序节点}catch ( ZkNoNodeException e ) {//ignore} finally {client.unsubscribeDataChanges(previousSequencePath, previousListener);}}}}catch ( Exception e ) {//发生异常需要删除节点doDelete = true;throw e;} finally {//如果需要删除节点if ( doDelete ) {deleteOurPath(ourPath);}}return haveTheLock;}private String getLockNodeNumber(String str, String lockName) {int index = str.lastIndexOf(lockName);if ( index >= 0 ) {index += lockName.length();return index <= str.length() ? str.substring(index) : "";}return str;}// 获取/locker下的经过排序的子节点列表List<String> getSortedChildren() throws Exception {try{List<String> children = client.getChildren(basePath);Collections.sort(children, new Comparator<String>() {public int compare(String lhs, String rhs) {return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));}});return children;} catch (ZkNoNodeException e){client.createPersistent(basePath, true);return getSortedChildren();}}protected void releaseLock(String lockPath) throws Exception{deleteOurPath(lockPath);}protected String attemptLock(long time, TimeUnit unit) throws Exception {final long startMillis = System.currentTimeMillis();final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;String ourPath = null;boolean hasTheLock = false;boolean isDone = false;int retryCount = 0;//网络闪断需要重试一试while ( !isDone ) {isDone = true;try {// 在/locker下创建临时的顺序节点ourPath = createLockNode(client, path);// 判断自己是否获得了锁,如果没有获得那么等待直到获得锁或者超时hasTheLock = waitToLock(startMillis, millisToWait, ourPath);} catch ( ZkNoNodeException e ) { // 捕获这个异常if ( retryCount++ < MAX_RETRY_COUNT ) { // 重试指定次数isDone = false;} else {throw e;}}}if ( hasTheLock ) {return ourPath;}return null;}}
public class TestDistributedLock {public static void main(String[] args) {final ZkClientExt zkClientExt1 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(zkClientExt1, "/Mutex");final ZkClientExt zkClientExt2 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(zkClientExt2, "/Mutex");try {mutex1.acquire();System.out.println("Client1 locked");Thread client2Thd = new Thread(new Runnable() {public void run() {try {mutex2.acquire();System.out.println("Client2 locked");mutex2.release();System.out.println("Client2 released lock");} catch (Exception e) {e.printStackTrace();}}});client2Thd.start();Thread.sleep(5000);mutex1.release();System.out.println("Client1 released lock");client2Thd.join();} catch (Exception e) {e.printStackTrace();}}}
public class ZkClientExt extends ZkClient {public ZkClientExt(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);}@Overridepublic void watchForData(final String path) {retryUntilConnected(new Callable<Object>() {public Object call() throws Exception {Stat stat = new Stat();_connection.readData(path, stat, true);return null;}});}}
5. 分布式队列
在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。
分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。
zookeeper可以通过顺序节点实现分布式队列。
架构图

图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。
流程图
offer核心算法流程

poll核心算法流程

代码实现
/*** 简单分布式队列*/public class DistributedSimpleQueue<T> {protected final ZkClient zkClient;// queue节点protected final String root;// 顺序节点前缀protected static final String Node_NAME = "n_";public DistributedSimpleQueue(ZkClient zkClient, String root) {this.zkClient = zkClient;this.root = root;}// 判断队列大小public int size() {return zkClient.getChildren(root).size();}// 判断队列是否为空public boolean isEmpty() {return zkClient.getChildren(root).size() == 0;}// 向队列提供数据public boolean offer(T element) throws Exception{// 创建顺序节点String nodeFullPath = root .concat( "/" ).concat( Node_NAME );try {zkClient.createPersistentSequential(nodeFullPath , element);}catch (ZkNoNodeException e) {zkClient.createPersistent(root);offer(element);} catch (Exception e) {throw ExceptionUtil.convertToRuntimeException(e);}return true;}// 从队列取数据public T poll() throws Exception {try {// 获取所有顺序节点List<String> list = zkClient.getChildren(root);if (list.size() == 0) {return null;}// 排序Collections.sort(list, new Comparator<String>() {public int compare(String lhs, String rhs) {return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));}});// 循环每个顺序节点名for ( String nodeName : list ){// 构造出顺序节点的完整路径String nodeFullPath = root.concat("/").concat(nodeName);try {// 读取顺序节点的内容T node = (T) zkClient.readData(nodeFullPath);// 删除顺序节点zkClient.delete(nodeFullPath);return node;} catch (ZkNoNodeException e) {// ignore 由其他客户端把这个顺序节点消费掉了}}return null;} catch (Exception e) {throw ExceptionUtil.convertToRuntimeException(e);}}private String getNodeNumber(String str, String nodeName) {int index = str.lastIndexOf(nodeName);if (index >= 0) {index += Node_NAME.length();return index <= str.length() ? str.substring(index) : "";}return str;}}
public class User implements Serializable {String name;String id;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getId() {return id;}public void setId(String id) {this.id = id;}}
public class TestDistributedSimpleQueue {public static void main(String[] args) {ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");User user1 = new User();user1.setId("1");user1.setName("xiao wang");User user2 = new User();user2.setId("2");user2.setName("xiao wang");try {queue.offer(user1);queue.offer(user2);User u1 = (User) queue.poll();User u2 = (User) queue.poll();if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){System.out.println("Success!");}} catch (Exception e) {e.printStackTrace();}}}
上面实现了一个简单分布式队列,在此基础上,我们再扩展一个阻塞分布式队列。代码如下:
/*** 阻塞分布式队列* 扩展自简单分布式队列,在拿不到队列数据时,进行阻塞直到拿到数据*/public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{public DistributedBlockingQueue(ZkClient zkClient, String root) {super(zkClient, root);}@Overridepublic T poll() throws Exception {while (true){ // 结束在latch上的等待后,再来一次final CountDownLatch latch = new CountDownLatch(1);final IZkChildListener childListener = new IZkChildListener() {public void handleChildChange(String parentPath, List<String> currentChilds)throws Exception {latch.countDown(); // 队列有变化,结束latch上的等待}};zkClient.subscribeChildChanges(root, childListener);try{T node = super.poll(); // 获取队列数据if ( node != null ){return node;} else {latch.await(); // 拿不到队列数据,则在latch上await}} finally {zkClient.unsubscribeChildChanges(root, childListener);}}}}
public class TestDistributedBlockingQueue {public static void main(String[] args) {ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);int delayTime = 5;ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue");final User user1 = new User();user1.setId("1");user1.setName("xiao wang");final User user2 = new User();user2.setId("2");user2.setName("xiao wang");try {delayExector.schedule(new Runnable() {public void run() {try {queue.offer(user1);queue.offer(user2);} catch (Exception e) {e.printStackTrace();}}}, delayTime , TimeUnit.SECONDS);System.out.println("ready poll!");User u1 = (User) queue.poll();User u2 = (User) queue.poll();if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){System.out.println("Success!");}} catch (Exception e) {e.printStackTrace();} finally{delayExector.shutdown();try {delayExector.awaitTermination(2, TimeUnit.SECONDS);} catch (InterruptedException e) {}}}}
6. 分布式命名服务
zookeeper的命名服务有两个应用方向,一个是提供类似JNDI的功能,利用zookeepeer的树型分层结构,可以把系统中各种服务的名称、地址以及目录信息存放在zookeeper,需要的时候去zookeeper中读取。
另一个,是利用zookeeper顺序节点的特性,制作分布式的ID生成器,写过数据库应用的朋友都知道,我们在往数据库表中插入记录时,通常需要为该记录创建唯一的ID,在单机环境中我们可以利用数据库的主键自增功能。但在分布式环境则无法使用,有一种方式可以使用UUID,但是它的缺陷是没有规律,很难理解。利用zookeeper顺序节点的特性,我们可以生成有顺序的,容易理解的,同时支持分布式环境的序列号。
代码实现
public class IdMaker {private ZkClient client = null;private final String server;// zookeeper顺序节点的父节点private final String root;// 顺序节点的名称private final String nodeName;// 标识当前服务是否正在运行private volatile boolean running = false;private ExecutorService cleanExector = null;public enum RemoveMethod{NONE,IMMEDIATELY,DELAY}public IdMaker(String zkServer,String root,String nodeName){this.root = root;this.server = zkServer;this.nodeName = nodeName;}// 启动服务public void start() throws Exception {if (running)throw new Exception("server has stated...");running = true;init();}// 停止服务public void stop() throws Exception {if (!running)throw new Exception("server has stopped...");running = false;freeResource();}// 初始化服务资源private void init(){client = new ZkClient(server,5000,5000,new BytesPushThroughSerializer());cleanExector = Executors.newFixedThreadPool(10);try{client.createPersistent(root,true);}catch (ZkNodeExistsException e){//ignore;}}// 释放服务器资源private void freeResource(){// 释放线程池cleanExector.shutdown();try{cleanExector.awaitTermination(2, TimeUnit.SECONDS);}catch(InterruptedException e){e.printStackTrace();}finally{cleanExector = null;}if (client!=null){client.close();client=null;}}// 检测当前服务是否正在运行private void checkRunning() throws Exception {if (!running)throw new Exception("请先调用start");}// 从顺序节点名中提取我们要的ID值private String ExtractId(String str){int index = str.lastIndexOf(nodeName);if (index >= 0){index+=nodeName.length();return index <= str.length()?str.substring(index):"";}return str;}// 生成IDpublic String generateId(RemoveMethod removeMethod) throws Exception{checkRunning();// 构造顺序节点的完整路径final String fullNodePath = root.concat("/").concat(nodeName);// 创建持久化顺序节点final String ourPath = client.createPersistentSequential(fullNodePath, null);// 避免zookeeper的顺序节点暴增,直接删除掉刚创建的顺序节点if (removeMethod.equals(RemoveMethod.IMMEDIATELY)){ // 立即删除client.delete(ourPath);}else if (removeMethod.equals(RemoveMethod.DELAY)){ // 延迟删除cleanExector.execute(new Runnable() { // 用线程池执行删除,让generateId()方法尽快返回public void run() {client.delete(ourPath);}});}//node-0000000000, node-0000000001return ExtractId(ourPath);}}
public class TestIdMaker {public static void main(String[] args) throws Exception {IdMaker idMaker = new IdMaker("192.168.1.105:2181","/NameService/IdGen", "ID");idMaker.start();try {for (int i = 0; i < 10; i++) {String id = idMaker.generateId(RemoveMethod.DELAY);System.out.println(id);}} finally {idMaker.stop();}}}
