1. master选举

考虑7*24小时向外提供服务的系统,不能有单点故障,于是我们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程中,从备机选出一台机作为主机的过程,就是Master选举。

架构图

Zookeeper使用案例 - 图1

左边是ZooKeeper集群,右边是3台工作服务器。工作服务器启动时,会去ZooKeeper的Servers节点下创建临时节点,并把基本信息写入临时节点。这个过程叫服务注册,系统中的其他服务可以通过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫做服务发现。接着这些服务器会尝试创建Master临时节点,谁创建成功谁就是Master,其他的两台就作为Slave。所有的Work Server必需关注Master节点的删除事件。通过监听Master节点的删除事件,来了解Master服务器是否宕机(创建临时节点的服务器一旦宕机,它所创建的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。

流程图

Zookeeper使用案例 - 图2

核心类图

Zookeeper使用案例 - 图3

WorkServer对应架构图的WorkServer,是主工作类;
RunningData用来描述WorkServer的基本信息;
LeaderSelectorZkClient作为调度器来启动和停止WorkServer;

代码实现

  1. /**
  2. * 工作服务器信息
  3. */
  4. public class RunningData implements Serializable {
  5. private static final long serialVersionUID = 4260577459043203630L;
  6. private Long cid;
  7. private String name;
  8. public Long getCid() {
  9. return cid;
  10. }
  11. public void setCid(Long cid) {
  12. this.cid = cid;
  13. }
  14. public String getName() {
  15. return name;
  16. }
  17. public void setName(String name) {
  18. this.name = name;
  19. }
  20. }
  1. /**
  2. * 工作服务器
  3. */
  4. public class WorkServer {
  5. // 记录服务器状态
  6. private volatile boolean running = false;
  7. private ZkClient zkClient;
  8. // Master节点对应zookeeper中的节点路径
  9. private static final String MASTER_PATH = "/master";
  10. // 监听Master节点删除事件
  11. private IZkDataListener dataListener;
  12. // 记录当前节点的基本信息
  13. private RunningData serverData;
  14. // 记录集群中Master节点的基本信息
  15. private RunningData masterData;
  16. private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
  17. private int delayTime = 5;
  18. public WorkServer(RunningData rd) {
  19. this.serverData = rd; // 记录服务器基本信息
  20. this.dataListener = new IZkDataListener() {
  21. public void handleDataDeleted(String dataPath) throws Exception {
  22. //takeMaster();
  23. if (masterData != null && masterData.getName().equals(serverData.getName())){
  24. // 自己就是上一轮的Master服务器,则直接抢
  25. takeMaster();
  26. } else {
  27. // 否则,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免不必要的数据迁移开销
  28. delayExector.schedule(new Runnable(){
  29. public void run(){
  30. takeMaster();
  31. }
  32. }, delayTime, TimeUnit.SECONDS);
  33. }
  34. }
  35. public void handleDataChange(String dataPath, Object data)
  36. throws Exception {
  37. }
  38. };
  39. }
  40. public ZkClient getZkClient() {
  41. return zkClient;
  42. }
  43. public void setZkClient(ZkClient zkClient) {
  44. this.zkClient = zkClient;
  45. }
  46. // 启动服务器
  47. public void start() throws Exception {
  48. if (running) {
  49. throw new Exception("server has startup...");
  50. }
  51. running = true;
  52. // 订阅Master节点删除事件
  53. zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
  54. // 争抢Master权利
  55. takeMaster();
  56. }
  57. // 停止服务器
  58. public void stop() throws Exception {
  59. if (!running) {
  60. throw new Exception("server has stoped");
  61. }
  62. running = false;
  63. delayExector.shutdown();
  64. // 取消Master节点事件订阅
  65. zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
  66. // 释放Master权利
  67. releaseMaster();
  68. }
  69. // 争抢Master
  70. private void takeMaster() {
  71. if (!running)
  72. return;
  73. try {
  74. // 尝试创建Master临时节点
  75. zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
  76. masterData = serverData;
  77. System.out.println(serverData.getName()+" is master");
  78. // 作为演示,我们让服务器每隔5秒释放一次Master权利
  79. delayExector.schedule(new Runnable() {
  80. public void run() {
  81. // TODO Auto-generated method stub
  82. if (checkMaster()){
  83. releaseMaster();
  84. }
  85. }
  86. }, 5, TimeUnit.SECONDS);
  87. } catch (ZkNodeExistsException e) { // 已被其他服务器创建了
  88. // 读取Master节点信息
  89. RunningData runningData = zkClient.readData(MASTER_PATH, true);
  90. if (runningData == null) {
  91. takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢
  92. } else {
  93. masterData = runningData;
  94. }
  95. } catch (Exception e) {
  96. // ignore;
  97. }
  98. }
  99. // 释放Master权利
  100. private void releaseMaster() {
  101. if (checkMaster()) {
  102. zkClient.delete(MASTER_PATH);
  103. }
  104. }
  105. // 检测自己是否为Master
  106. private boolean checkMaster() {
  107. try {
  108. RunningData eventData = zkClient.readData(MASTER_PATH);
  109. masterData = eventData;
  110. if (masterData.getName().equals(serverData.getName())) {
  111. return true;
  112. }
  113. return false;
  114. } catch (ZkNoNodeException e) {
  115. return false; // 节点不存在,自己肯定不是Master了
  116. } catch (ZkInterruptedException e) {
  117. return checkMaster();
  118. } catch (ZkException e) {
  119. return false;
  120. }
  121. }
  122. }
  1. /**
  2. * 调度器
  3. */
  4. public class LeaderSelectorZkClient {
  5. //启动的服务个数
  6. private static final int CLIENT_QTY = 10;
  7. //zookeeper服务器的地址
  8. private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
  9. public static void main(String[] args) throws Exception {
  10. //保存所有zkClient的列表
  11. List<ZkClient> clients = new ArrayList<ZkClient>();
  12. //保存所有服务的列表
  13. List<WorkServer> workServers = new ArrayList<WorkServer>();
  14. try {
  15. for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟创建10个服务器并启动
  16. //创建zkClient
  17. ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
  18. clients.add(client);
  19. //创建serverData
  20. RunningData runningData = new RunningData();
  21. runningData.setCid(Long.valueOf(i));
  22. runningData.setName("Client #" + i);
  23. //创建服务
  24. WorkServer workServer = new WorkServer(runningData);
  25. workServer.setZkClient(client);
  26. workServers.add(workServer);
  27. workServer.start();
  28. }
  29. System.out.println("敲回车键退出!\n");
  30. new BufferedReader(new InputStreamReader(System.in)).readLine();
  31. } finally {
  32. System.out.println("Shutting down...");
  33. for ( WorkServer workServer : workServers ) {
  34. try {
  35. workServer.stop();
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. for ( ZkClient client : clients ) {
  41. try {
  42. client.close();
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. }
  49. }

2. 数据发布订阅

多个订阅者对象同时监听同一主题对象,主题对象状态变化时通知所有订阅者对象更新自身状态。发布方和订阅方独立封装、独立改变,当一个对象的改变需要同时改变其他对象,并且它不知道有多少个对象需要改变时,可以使用发布订阅模式。

在分布式系统中的顶级应用有配置管理和服务发现。

配置管理:指集群中的机器拥有某些配置,并且这些配置信息需要动态地改变,那么我们就可以使用发布订阅模式把配置做统一的管理,让这些机器订阅配置信息的改变,但是配置改变时这些机器得到通知并更新自己的配置。

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

架构图

Zookeeper使用案例 - 图4

左边代表Zookeeper集群,右侧代表服务器集群。其中前3个为工作服务器Work Server,绿色为管理服务器Manage Server,最下面的是控制服务器Control Server。

config节点,用于配置管理。Manage Server通过config节点下发配置信息,Work Server可以通过订阅config节点的改变来更新自己的配置。

Servers节点,用于服务发现,每个Work Server在启动时都会在Servers节点下创建一个临时节点,Manage Server充当monitor,通过监听Servers节点的子节点列表的变化来更新自己内存中工作服务器列表的信息。

通过Control Server由Command节点作为中介,向Manage Server发送控制指令。Control Server向command节点写入命令信息,Manage Server订阅command节点的数据改变来监听并执行命令。

流程图

Manage Server程序主体流程

Zookeeper使用案例 - 图5

核心类图

Zookeeper使用案例 - 图6

WorkServer对应架构图的Work Server;
ManageServer对应架构图的Manage Server;
ServerConfig用于记录Work Server的配置信息;
ServerData用于记录Work Server的基本信息;
SubscribeZkClient作为示例程序入口服务站启动Work Server和Manage Server

实现代码

  1. /**
  2. * 配置信息
  3. */
  4. public class ServerConfig {
  5. private String dbUrl;
  6. private String dbPwd;
  7. private String dbUser;
  8. public String getDbUrl() {
  9. return dbUrl;
  10. }
  11. public void setDbUrl(String dbUrl) {
  12. this.dbUrl = dbUrl;
  13. }
  14. public String getDbPwd() {
  15. return dbPwd;
  16. }
  17. public void setDbPwd(String dbPwd) {
  18. this.dbPwd = dbPwd;
  19. }
  20. public String getDbUser() {
  21. return dbUser;
  22. }
  23. public void setDbUser(String dbUser) {
  24. this.dbUser = dbUser;
  25. }
  26. @Override
  27. public String toString() {
  28. return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
  29. + ", dbUser=" + dbUser + "]";
  30. }
  31. }
  1. /**
  2. * 服务器基本信息
  3. */
  4. public class ServerData {
  5. private String address;
  6. private Integer id;
  7. private String name;
  8. public String getAddress() {
  9. return address;
  10. }
  11. public void setAddress(String address) {
  12. this.address = address;
  13. }
  14. public Integer getId() {
  15. return id;
  16. }
  17. public void setId(Integer id) {
  18. this.id = id;
  19. }
  20. public String getName() {
  21. return name;
  22. }
  23. public void setName(String name) {
  24. this.name = name;
  25. }
  26. @Override
  27. public String toString() {
  28. return "ServerData [address=" + address + ", id=" + id + ", name="
  29. + name + "]";
  30. }
  31. }
  1. /**
  2. * 代表工作服务器
  3. */
  4. public class WorkServer {
  5. private ZkClient zkClient;
  6. // ZooKeeper
  7. private String configPath;
  8. // ZooKeeper集群中servers节点的路径
  9. private String serversPath;
  10. // 当前工作服务器的基本信息
  11. private ServerData serverData;
  12. // 当前工作服务器的配置信息
  13. private ServerConfig serverConfig;
  14. private IZkDataListener dataListener;
  15. public WorkServer(String configPath, String serversPath,
  16. ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
  17. this.zkClient = zkClient;
  18. this.serversPath = serversPath;
  19. this.configPath = configPath;
  20. this.serverConfig = initConfig;
  21. this.serverData = serverData;
  22. this.dataListener = new IZkDataListener() {
  23. public void handleDataDeleted(String dataPath) throws Exception {
  24. }
  25. public void handleDataChange(String dataPath, Object data)
  26. throws Exception {
  27. String retJson = new String((byte[])data);
  28. ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);
  29. updateConfig(serverConfigLocal);
  30. System.out.println("new Work server config is:"+serverConfig.toString());
  31. }
  32. };
  33. }
  34. // 启动服务器
  35. public void start() {
  36. System.out.println("work server start...");
  37. initRunning();
  38. }
  39. // 停止服务器
  40. public void stop() {
  41. System.out.println("work server stop...");
  42. zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
  43. }
  44. // 服务器初始化
  45. private void initRunning() {
  46. registMe(); // 注册自己
  47. zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
  48. }
  49. // 启动时向zookeeper注册自己的注册函数
  50. private void registMe() {
  51. String mePath = serversPath.concat("/").concat(serverData.getAddress());
  52. try {
  53. zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
  54. .getBytes());
  55. } catch (ZkNoNodeException e) {
  56. zkClient.createPersistent(serversPath, true);
  57. registMe();
  58. }
  59. }
  60. // 更新自己的配置信息
  61. private void updateConfig(ServerConfig serverConfig) {
  62. this.serverConfig = serverConfig;
  63. }
  64. }
  1. public class ManageServer {
  2. // zookeeper的servers节点路径
  3. private String serversPath;
  4. // zookeeper的command节点路径
  5. private String commandPath;
  6. // zookeeper的config节点路径
  7. private String configPath;
  8. private ZkClient zkClient;
  9. private ServerConfig config;
  10. // 用于监听servers节点的子节点列表的变化
  11. private IZkChildListener childListener;
  12. // 用于监听command节点数据内容的变化
  13. private IZkDataListener dataListener;
  14. // 工作服务器的列表
  15. private List<String> workServerList;
  16. public ManageServer(String serversPath, String commandPath,
  17. String configPath, ZkClient zkClient, ServerConfig config) {
  18. this.serversPath = serversPath;
  19. this.commandPath = commandPath;
  20. this.zkClient = zkClient;
  21. this.config = config;
  22. this.configPath = configPath;
  23. this.childListener = new IZkChildListener() {
  24. public void handleChildChange(String parentPath,
  25. List<String> currentChilds) throws Exception {
  26. // TODO Auto-generated method stub
  27. workServerList = currentChilds; // 更新内存中工作服务器列表
  28. System.out.println("work server list changed, new list is ");
  29. execList();
  30. }
  31. };
  32. this.dataListener = new IZkDataListener() {
  33. public void handleDataDeleted(String dataPath) throws Exception {
  34. // TODO Auto-generated method stub
  35. // ignore;
  36. }
  37. public void handleDataChange(String dataPath, Object data)
  38. throws Exception {
  39. // TODO Auto-generated method stub
  40. String cmd = new String((byte[]) data);
  41. System.out.println("cmd:"+cmd);
  42. exeCmd(cmd); // 执行命令
  43. }
  44. };
  45. }
  46. private void initRunning() {
  47. zkClient.subscribeDataChanges(commandPath, dataListener);
  48. zkClient.subscribeChildChanges(serversPath, childListener);
  49. }
  50. /*
  51. * 1: list 2: create 3: modify
  52. */
  53. private void exeCmd(String cmdType) {
  54. if ("list".equals(cmdType)) {
  55. execList();
  56. } else if ("create".equals(cmdType)) {
  57. execCreate();
  58. } else if ("modify".equals(cmdType)) {
  59. execModify();
  60. } else {
  61. System.out.println("error command!" + cmdType);
  62. }
  63. }
  64. // 列出工作服务器列表
  65. private void execList() {
  66. System.out.println(workServerList.toString());
  67. }
  68. // 创建config节点
  69. private void execCreate() {
  70. if (!zkClient.exists(configPath)) {
  71. try {
  72. zkClient.createPersistent(configPath, JSON.toJSONString(config)
  73. .getBytes());
  74. } catch (ZkNodeExistsException e) {
  75. zkClient.writeData(configPath, JSON.toJSONString(config)
  76. .getBytes()); // config节点已经存在,则写入内容就可以了
  77. } catch (ZkNoNodeException e) {
  78. String parentDir = configPath.substring(0,
  79. configPath.lastIndexOf('/'));
  80. zkClient.createPersistent(parentDir, true);
  81. execCreate();
  82. }
  83. }
  84. }
  85. // 修改config节点内容
  86. private void execModify() {
  87. // 我们随意修改config的一个属性就可以了
  88. config.setDbUser(config.getDbUser() + "_modify");
  89. try {
  90. zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
  91. } catch (ZkNoNodeException e) {
  92. execCreate(); // 写入时config节点还未存在,则创建它
  93. }
  94. }
  95. // 启动工作服务器
  96. public void start() {
  97. initRunning();
  98. }
  99. // 停止工作服务器
  100. public void stop() {
  101. zkClient.unsubscribeChildChanges(serversPath, childListener);
  102. zkClient.unsubscribeDataChanges(commandPath, dataListener);
  103. }
  104. }
  1. /**
  2. * 调度类
  3. */
  4. public class SubscribeZkClient {
  5. private static final int CLIENT_QTY = 5; // Work Server数量
  6. private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
  7. private static final String CONFIG_PATH = "/config";
  8. private static final String COMMAND_PATH = "/command";
  9. private static final String SERVERS_PATH = "/servers";
  10. public static void main(String[] args) throws Exception {
  11. List<ZkClient> clients = new ArrayList<ZkClient>();
  12. List<WorkServer> workServers = new ArrayList<WorkServer>();
  13. ManageServer manageServer = null;
  14. try {
  15. // 创建一个默认的配置
  16. ServerConfig initConfig = new ServerConfig();
  17. initConfig.setDbPwd("123456");
  18. initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
  19. initConfig.setDbUser("root");
  20. // 实例化一个Manage Server
  21. ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
  22. manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
  23. manageServer.start(); // 启动Manage Server
  24. // 创建指定个数的工作服务器
  25. for ( int i = 0; i < CLIENT_QTY; ++i ) {
  26. ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
  27. clients.add(client);
  28. ServerData serverData = new ServerData();
  29. serverData.setId(i);
  30. serverData.setName("WorkServer#"+i);
  31. serverData.setAddress("192.168.1."+i);
  32. WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
  33. workServers.add(workServer);
  34. workServer.start(); // 启动工作服务器
  35. }
  36. System.out.println("敲回车键退出!\n");
  37. new BufferedReader(new InputStreamReader(System.in)).readLine();
  38. } finally {
  39. System.out.println("Shutting down...");
  40. for ( WorkServer workServer : workServers ) {
  41. try {
  42. workServer.stop();
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. for ( ZkClient client : clients ) {
  48. try {
  49. client.close();
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }
  55. }
  56. }

我们用zookeeper的命令行客户端向Manage Server下达指令。
执行zkCli命令:

  1. create /command list

java控制台输出:

  1. cmd:list
  2. [192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]

执行zkCli命令:

  1. set /command create

java控制台输出:

  1. cmd:create
  2. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
  3. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
  4. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
  5. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
  6. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]

执行zkCli命令:

  1. set /command modify

java控制台输出:

  1. cmd:modify
  2. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
  3. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
  4. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
  5. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
  6. new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modi

3. 负载均衡

负载均衡是一种手段,用来把对某种资源的访问分摊给不同的设备,从而减轻单点的压力。

架构图

Zookeeper使用案例 - 图7

图中左侧为ZooKeeper集群,右侧上方为工作服务器,下面为客户端。每台工作服务器在启动时都会去zookeeper的servers节点下注册临时节点,每台客户端在启动时都会去servers节点下取得所有可用的工作服务器列表,并通过一定的负载均衡算法计算得出一台工作服务器,并与之建立网络连接。网络连接我们采用开源框架netty。

流程图

负载均衡客户端流程

Zookeeper使用案例 - 图8

服务端主体流程

Zookeeper使用案例 - 图9

类图

Server端核心类

Zookeeper使用案例 - 图10
每个服务端对应一个Server接口,ServiceImpl是服务端的实现类。把服务端启动时的注册过程抽出为一个接口RegistProvider,并给予一个默认实现DefaultRegistProvider,它将用到一个上下文的类ZooKeeperRegistContext。我们的服务端是给予Netty的,它需要ServerHandler来处理与客户端之间的连接,当有客户端建立或失去连接时,我们都需要去修改当前服务器的负载信息,我们把修改负载信息的过程也抽出为一个接口BalanceUpdateProvider,并且给予了一个默认实现DefaultBalanceUpdateProvider。ServerRunner是调度类,负责调度我们的Server。

Client端核心类

Zookeeper使用案例 - 图11
每个客户端都需要实现一个Client接口,ClientImpl是实现,Client需要ClientHandler来处理与服务器之前的通讯,同时它需要BalanceProvider为它提供负载均衡的算法。BalanceProvider是接口,它有2个实现类,一个是抽象的实现AbstractBalanceProvider,一个是默认的实现DefaultBalanceProvider。ServerData是服务端和客户端共用的一个类,服务端会把自己的基本信息,包括负载信息,打包成ServerData并写入到zookeeper中,客户端在计算负载的时候需要到zookeeper中拿到ServerData,并取得服务端的地址和负载信息。ClientRunner是客户端的调度类,负责启动客户端。

代码实现

先是Server端的代码:

  1. public class ServerData implements Serializable,Comparable<ServerData> {
  2. private static final long serialVersionUID = -8892569870391530906L;
  3. private Integer balance;
  4. private String host;
  5. private Integer port;
  6. public Integer getBalance() {
  7. return balance;
  8. }
  9. public void setBalance(Integer balance) {
  10. this.balance = balance;
  11. }
  12. public String getHost() {
  13. return host;
  14. }
  15. public void setHost(String host) {
  16. this.host = host;
  17. }
  18. public Integer getPort() {
  19. return port;
  20. }
  21. public void setPort(Integer port) {
  22. this.port = port;
  23. }
  24. @Override
  25. public String toString() {
  26. return "ServerData [balance=" + balance + ", host=" + host + ", port="
  27. + port + "]";
  28. }
  29. public int compareTo(ServerData o) {
  30. return this.getBalance().compareTo(o.getBalance());
  31. }
  32. }
  1. public interface Server {
  2. public void bind();
  3. }
  1. public class ServerImpl implements Server {
  2. private EventLoopGroup bossGroup = new NioEventLoopGroup();
  3. private EventLoopGroup workGroup = new NioEventLoopGroup();
  4. private ServerBootstrap bootStrap = new ServerBootstrap();
  5. private ChannelFuture cf;
  6. private String zkAddress;
  7. private String serversPath;
  8. private String currentServerPath;
  9. private ServerData sd;
  10. private volatile boolean binded = false;
  11. private final ZkClient zc;
  12. private final RegistProvider registProvider;
  13. private static final Integer SESSION_TIME_OUT = 10000;
  14. private static final Integer CONNECT_TIME_OUT = 10000;
  15. public String getCurrentServerPath() {
  16. return currentServerPath;
  17. }
  18. public String getZkAddress() {
  19. return zkAddress;
  20. }
  21. public String getServersPath() {
  22. return serversPath;
  23. }
  24. public ServerData getSd() {
  25. return sd;
  26. }
  27. public void setSd(ServerData sd) {
  28. this.sd = sd;
  29. }
  30. public ServerImpl(String zkAddress, String serversPath, ServerData sd){
  31. this.zkAddress = zkAddress;
  32. this.serversPath = serversPath;
  33. this.zc = new ZkClient(this.zkAddress,SESSION_TIME_OUT,CONNECT_TIME_OUT,new SerializableSerializer());
  34. this.registProvider = new DefaultRegistProvider();
  35. this.sd = sd;
  36. }
  37. //初始化服务端
  38. private void initRunning() throws Exception {
  39. String mePath = serversPath.concat("/").concat(sd.getPort().toString());
  40. //注册到zookeeper
  41. registProvider.regist(new ZooKeeperRegistContext(mePath,zc,sd));
  42. currentServerPath = mePath;
  43. }
  44. public void bind() {
  45. if (binded){
  46. return;
  47. }
  48. System.out.println(sd.getPort()+":binding...");
  49. try {
  50. initRunning();
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. return;
  54. }
  55. bootStrap.group(bossGroup,workGroup)
  56. .channel(NioServerSocketChannel.class)
  57. .option(ChannelOption.SO_BACKLOG, 1024)
  58. .childHandler(new ChannelInitializer<SocketChannel>() {
  59. @Override
  60. public void initChannel(SocketChannel ch) throws Exception {
  61. ChannelPipeline p = ch.pipeline();
  62. p.addLast(new ServerHandler(new DefaultBalanceUpdateProvider(currentServerPath,zc)));
  63. }
  64. });
  65. try {
  66. cf = bootStrap.bind(sd.getPort()).sync();
  67. binded = true;
  68. System.out.println(sd.getPort()+":binded...");
  69. cf.channel().closeFuture().sync();
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }finally{
  73. bossGroup.shutdownGracefully();
  74. workGroup.shutdownGracefully();
  75. }
  76. }
  77. }
  1. public interface RegistProvider {
  2. public void regist(Object context) throws Exception;
  3. public void unRegist(Object context) throws Exception;
  4. }
  1. public class DefaultRegistProvider implements RegistProvider {
  2. // 在zookeeper中创建临时节点并写入信息
  3. public void regist(Object context) throws Exception {
  4. // Server在zookeeper中注册自己,需要在zookeeper的目标节点上创建临时节点并写入自己
  5. // 将需要的以下3个信息包装成上下文传入
  6. // 1:path
  7. // 2:zkClient
  8. // 3:serverData
  9. ZooKeeperRegistContext registContext = (ZooKeeperRegistContext) context;
  10. String path = registContext.getPath();
  11. ZkClient zc = registContext.getZkClient();
  12. try {
  13. zc.createEphemeral(path, registContext.getData());
  14. } catch (ZkNoNodeException e) {
  15. String parentDir = path.substring(0, path.lastIndexOf('/'));
  16. zc.createPersistent(parentDir, true);
  17. regist(registContext);
  18. }
  19. }
  20. public void unRegist(Object context) throws Exception {
  21. return;
  22. }
  23. }
  1. public class ZooKeeperRegistContext {
  2. private String path;
  3. private ZkClient zkClient;
  4. private Object data;
  5. public ZooKeeperRegistContext(String path, ZkClient zkClient, Object data) {
  6. super();
  7. this.path = path;
  8. this.zkClient = zkClient;
  9. this.data = data;
  10. }
  11. public String getPath() {
  12. return path;
  13. }
  14. public void setPath(String path) {
  15. this.path = path;
  16. }
  17. public ZkClient getZkClient() {
  18. return zkClient;
  19. }
  20. public void setZkClient(ZkClient zkClient) {
  21. this.zkClient = zkClient;
  22. }
  23. public Object getData() {
  24. return data;
  25. }
  26. public void setData(Object data) {
  27. this.data = data;
  28. }
  29. }
  1. /**
  2. * 处理服务端与客户端之间的通信
  3. */
  4. public class ServerHandler extends ChannelHandlerAdapter{
  5. private final BalanceUpdateProvider balanceUpdater;
  6. private static final Integer BALANCE_STEP = 1;
  7. public ServerHandler(BalanceUpdateProvider balanceUpdater){
  8. this.balanceUpdater = balanceUpdater;
  9. }
  10. public BalanceUpdateProvider getBalanceUpdater() {
  11. return balanceUpdater;
  12. }
  13. // 建立连接时增加负载
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  16. System.out.println("one client connect...");
  17. balanceUpdater.addBalance(BALANCE_STEP);
  18. }
  19. // 断开连接时减少负载
  20. @Override
  21. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  22. balanceUpdater.reduceBalance(BALANCE_STEP);
  23. }
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  26. cause.printStackTrace();
  27. ctx.close();
  28. }
  29. }
  1. public interface BalanceUpdateProvider {
  2. // 增加负载
  3. public boolean addBalance(Integer step);
  4. // 减少负载
  5. public boolean reduceBalance(Integer step);
  6. }
  1. public class DefaultBalanceUpdateProvider implements BalanceUpdateProvider {
  2. private String serverPath;
  3. private ZkClient zc;
  4. public DefaultBalanceUpdateProvider(String serverPath, ZkClient zkClient) {
  5. this.serverPath = serverPath;
  6. this.zc = zkClient;
  7. }
  8. public boolean addBalance(Integer step) {
  9. Stat stat = new Stat();
  10. ServerData sd;
  11. // 增加负载:读取服务器的信息ServerData,增加负载,并写回zookeeper
  12. while (true) {
  13. try {
  14. sd = zc.readData(this.serverPath, stat);
  15. sd.setBalance(sd.getBalance() + step);
  16. // 带上版本,因为可能有其他客户端连接到服务器修改了负载
  17. zc.writeData(this.serverPath, sd, stat.getVersion());
  18. return true;
  19. } catch (ZkBadVersionException e) {
  20. // ignore
  21. } catch (Exception e) {
  22. return false;
  23. }
  24. }
  25. }
  26. public boolean reduceBalance(Integer step) {
  27. Stat stat = new Stat();
  28. ServerData sd;
  29. while (true) {
  30. try {
  31. sd = zc.readData(this.serverPath, stat);
  32. final Integer currBalance = sd.getBalance();
  33. sd.setBalance(currBalance>step?currBalance-step:0);
  34. zc.writeData(this.serverPath, sd, stat.getVersion());
  35. return true;
  36. } catch (ZkBadVersionException e) {
  37. // ignore
  38. } catch (Exception e) {
  39. return false;
  40. }
  41. }
  42. }
  43. }
  1. /**
  2. * 用于测试,负责启动Work Server
  3. */
  4. public class ServerRunner {
  5. private static final int SERVER_QTY = 2;
  6. private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
  7. private static final String SERVERS_PATH = "/servers";
  8. public static void main(String[] args) {
  9. List<Thread> threadList = new ArrayList<Thread>();
  10. for(int i=0; i<SERVER_QTY; i++){
  11. final Integer count = i;
  12. Thread thread = new Thread(new Runnable() {
  13. public void run() {
  14. ServerData sd = new ServerData();
  15. sd.setBalance(0);
  16. sd.setHost("127.0.0.1");
  17. sd.setPort(6000+count);
  18. Server server = new ServerImpl(ZOOKEEPER_SERVER,SERVERS_PATH,sd);
  19. server.bind();
  20. }
  21. });
  22. threadList.add(thread);
  23. thread.start();
  24. }
  25. for (int i=0; i<threadList.size(); i++){
  26. try {
  27. threadList.get(i).join();
  28. } catch (InterruptedException ignore) {
  29. //
  30. }
  31. }
  32. }
  33. }

再看Client端的代码:

  1. public interface Client {
  2. // 连接服务器
  3. public void connect() throws Exception;
  4. // 断开服务器
  5. public void disConnect() throws Exception;
  6. }
  1. public class ClientImpl implements Client {
  2. private final BalanceProvider<ServerData> provider;
  3. private EventLoopGroup group = null;
  4. private Channel channel = null;
  5. private final Logger log = LoggerFactory.getLogger(getClass());
  6. public ClientImpl(BalanceProvider<ServerData> provider) {
  7. this.provider = provider;
  8. }
  9. public BalanceProvider<ServerData> getProvider() {
  10. return provider;
  11. }
  12. public void connect(){
  13. try{
  14. ServerData serverData = provider.getBalanceItem(); // 获取负载最小的服务器信息,并与之建立连接
  15. System.out.println("connecting to "+serverData.getHost()+":"+serverData.getPort()+", it's balance:"+serverData.getBalance());
  16. group = new NioEventLoopGroup();
  17. Bootstrap b = new Bootstrap();
  18. b.group(group)
  19. .channel(NioSocketChannel.class)
  20. .handler(new ChannelInitializer<SocketChannel>() {
  21. @Override
  22. public void initChannel(SocketChannel ch) throws Exception {
  23. ChannelPipeline p = ch.pipeline();
  24. p.addLast(new ClientHandler());
  25. }
  26. });
  27. ChannelFuture f = b.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly();
  28. channel = f.channel();
  29. System.out.println("started success!");
  30. }catch(Exception e){
  31. System.out.println("连接异常:"+e.getMessage());
  32. }
  33. }
  34. public void disConnect(){
  35. try{
  36. if (channel!=null)
  37. channel.close().syncUninterruptibly();
  38. group.shutdownGracefully();
  39. group = null;
  40. log.debug("disconnected!");
  41. }catch(Exception e){
  42. log.error(e.getMessage());
  43. }
  44. }
  45. }
  1. public class ClientHandler extends ChannelHandlerAdapter {
  2. @Override
  3. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  4. // Close the connection when an exception is raised.
  5. cause.printStackTrace();
  6. ctx.close();
  7. }
  8. }
  1. public interface BalanceProvider<T> {
  2. public T getBalanceItem();
  3. }
  1. public abstract class AbstractBalanceProvider<T> implements BalanceProvider<T> {
  2. protected abstract T balanceAlgorithm(List<T> items);
  3. protected abstract List<T> getBalanceItems();
  4. public T getBalanceItem(){
  5. return balanceAlgorithm(getBalanceItems());
  6. }
  7. }
  1. public class DefaultBalanceProvider extends AbstractBalanceProvider<ServerData> {
  2. private final String zkServer; // zookeeper服务器地址
  3. private final String serversPath; // servers节点路径
  4. private final ZkClient zc;
  5. private static final Integer SESSION_TIME_OUT = 10000;
  6. private static final Integer CONNECT_TIME_OUT = 10000;
  7. public DefaultBalanceProvider(String zkServer, String serversPath) {
  8. this.serversPath = serversPath;
  9. this.zkServer = zkServer;
  10. this.zc = new ZkClient(this.zkServer, SESSION_TIME_OUT, CONNECT_TIME_OUT,
  11. new SerializableSerializer());
  12. }
  13. @Override
  14. protected ServerData balanceAlgorithm(List<ServerData> items) {
  15. if (items.size()>0){
  16. Collections.sort(items); // 根据负载由小到大排序
  17. return items.get(0); // 返回负载最小的那个
  18. }else{
  19. return null;
  20. }
  21. }
  22. /**
  23. * 从zookeeper中拿到所有工作服务器的基本信息
  24. */
  25. @Override
  26. protected List<ServerData> getBalanceItems() {
  27. List<ServerData> sdList = new ArrayList<ServerData>();
  28. List<String> children = zc.getChildren(this.serversPath);
  29. for(int i=0; i<children.size();i++){
  30. ServerData sd = zc.readData(serversPath+"/"+children.get(i));
  31. sdList.add(sd);
  32. }
  33. return sdList;
  34. }
  35. }
  1. public class ClientRunner {
  2. private static final int CLIENT_QTY = 3;
  3. private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
  4. private static final String SERVERS_PATH = "/servers";
  5. public static void main(String[] args) {
  6. List<Thread> threadList = new ArrayList<Thread>(CLIENT_QTY);
  7. final List<Client> clientList = new ArrayList<Client>();
  8. final BalanceProvider<ServerData> balanceProvider = new DefaultBalanceProvider(ZOOKEEPER_SERVER, SERVERS_PATH);
  9. try{
  10. for(int i=0; i<CLIENT_QTY; i++){
  11. Thread thread = new Thread(new Runnable() {
  12. public void run() {
  13. Client client = new ClientImpl(balanceProvider);
  14. clientList.add(client);
  15. try {
  16. client.connect();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. });
  22. threadList.add(thread);
  23. thread.start();
  24. //延时
  25. Thread.sleep(2000);
  26. }
  27. System.out.println("敲回车键退出!\n");
  28. new BufferedReader(new InputStreamReader(System.in)).readLine();
  29. }catch(Exception e){
  30. e.printStackTrace();
  31. }finally{
  32. //关闭客户端
  33. for (int i=0; i<clientList.size(); i++){
  34. try {
  35. clientList.get(i);
  36. clientList.get(i).disConnect();
  37. } catch (Exception ignore) {
  38. //ignore
  39. }
  40. }
  41. //关闭线程
  42. for (int i=0; i<threadList.size(); i++){
  43. threadList.get(i).interrupt();
  44. try{
  45. threadList.get(i).join();
  46. }catch (InterruptedException e){
  47. //ignore
  48. }
  49. }
  50. }
  51. }
  52. }

我们先启动服务端ServerRunner

  1. 6000:binding...
  2. 6000:binded...
  3. 6001:binding...
  4. 6001:binded...

再来启动客户端ClientRunner

  1. connecting to 127.0.0.1:6000, it's balance 0
  2. started success!
  3. connecting to 127.0.0.1:6001, it's balance 0
  4. started success!
  5. connecting to 127.0.0.1:6000, it's balance 1
  6. started success!
  7. 敲回车退出!

4. 分布式锁

我们常说的锁是单进程多线程锁,在多线程并发编程中,用于线程之间的数据同步,保护共享资源的访问。而分布式锁,指在分布式环境下,保护跨进程、跨主机、跨网络的共享资源,实现互斥访问,保证一致性。

架构图

Zookeeper使用案例 - 图12

左侧是zookeeper集群,locker是数据节点,node_1到node_n代表一系列的顺序节点。

右侧client_1至client_n代表客户端,Service代表需要互斥访问的服务。

总实现思路,是在获取锁的时候在locker节点下创建顺序节点,在释放锁的时候,把自己创建的节点删除。

流程图

Zookeeper使用案例 - 图13

类图

代码实现

  1. public interface DistributedLock {
  2. /*
  3. * 获取锁,如果没有得到就等待
  4. */
  5. public void acquire() throws Exception;
  6. /*
  7. * 获取锁,直到超时
  8. */
  9. public boolean acquire(long time, TimeUnit unit) throws Exception;
  10. /*
  11. * 释放锁
  12. */
  13. public void release() throws Exception;
  14. }
  1. public class SimpleDistributedLockMutex extends BaseDistributedLock implements
  2. DistributedLock {
  3. //锁名称前缀,成功创建的顺序节点如lock-0000000000,lock-0000000001,...
  4. private static final String LOCK_NAME = "lock-";
  5. // zookeeper中locker节点的路径
  6. private final String basePath;
  7. // 获取锁以后自己创建的那个顺序节点的路径
  8. private String ourLockPath;
  9. private boolean internalLock(long time, TimeUnit unit) throws Exception {
  10. ourLockPath = attemptLock(time, unit);
  11. return ourLockPath != null;
  12. }
  13. public SimpleDistributedLockMutex(ZkClientExt client, String basePath){
  14. super(client,basePath,LOCK_NAME);
  15. this.basePath = basePath;
  16. }
  17. // 获取锁
  18. public void acquire() throws Exception {
  19. if ( !internalLock(-1, null) ) {
  20. throw new IOException("连接丢失!在路径:'"+basePath+"'下不能获取锁!");
  21. }
  22. }
  23. // 获取锁,可以超时
  24. public boolean acquire(long time, TimeUnit unit) throws Exception {
  25. return internalLock(time, unit);
  26. }
  27. // 释放锁
  28. public void release() throws Exception {
  29. releaseLock(ourLockPath);
  30. }
  31. }
  1. public class BaseDistributedLock {
  2. private final ZkClientExt client;
  3. private final String path;
  4. private final String basePath;
  5. private final String lockName;
  6. private static final Integer MAX_RETRY_COUNT = 10;
  7. public BaseDistributedLock(ZkClientExt client, String path, String lockName){
  8. this.client = client;
  9. this.basePath = path;
  10. this.path = path.concat("/").concat(lockName);
  11. this.lockName = lockName;
  12. }
  13. // 删除成功获取锁之后所创建的那个顺序节点
  14. private void deleteOurPath(String ourPath) throws Exception{
  15. client.delete(ourPath);
  16. }
  17. // 创建临时顺序节点
  18. private String createLockNode(ZkClient client, String path) throws Exception{
  19. return client.createEphemeralSequential(path, null);
  20. }
  21. // 等待比自己次小的顺序节点的删除
  22. private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{
  23. boolean haveTheLock = false;
  24. boolean doDelete = false;
  25. try {
  26. while ( !haveTheLock ) {
  27. // 获取/locker下的经过排序的子节点列表
  28. List<String> children = getSortedChildren();
  29. // 获取刚才自己创建的那个顺序节点名
  30. String sequenceNodeName = ourPath.substring(basePath.length()+1);
  31. // 判断自己排第几个
  32. int ourIndex = children.indexOf(sequenceNodeName);
  33. if (ourIndex < 0){ // 网络抖动,获取到的子节点列表里可能已经没有自己了
  34. throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
  35. }
  36. // 如果是第一个,代表自己已经获得了锁
  37. boolean isGetTheLock = ourIndex == 0;
  38. // 如果自己没有获得锁,则要watch比我们次小的那个节点
  39. String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);
  40. if ( isGetTheLock ){
  41. haveTheLock = true;
  42. } else {
  43. // 订阅比自己次小顺序节点的删除事件
  44. String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
  45. final CountDownLatch latch = new CountDownLatch(1);
  46. final IZkDataListener previousListener = new IZkDataListener() {
  47. public void handleDataDeleted(String dataPath) throws Exception {
  48. latch.countDown(); // 删除后结束latch上的await
  49. }
  50. public void handleDataChange(String dataPath, Object data) throws Exception {
  51. // ignore
  52. }
  53. };
  54. try {
  55. //订阅次小顺序节点的删除事件,如果节点不存在会出现异常
  56. client.subscribeDataChanges(previousSequencePath, previousListener);
  57. if ( millisToWait != null ) {
  58. millisToWait -= (System.currentTimeMillis() - startMillis);
  59. startMillis = System.currentTimeMillis();
  60. if ( millisToWait <= 0 ) {
  61. doDelete = true; // timed out - delete our node
  62. break;
  63. }
  64. latch.await(millisToWait, TimeUnit.MICROSECONDS); // 在latch上await
  65. } else {
  66. latch.await(); // 在latch上await
  67. }
  68. // 结束latch上的等待后,继续while重新来过判断自己是否第一个顺序节点
  69. }
  70. catch ( ZkNoNodeException e ) {
  71. //ignore
  72. } finally {
  73. client.unsubscribeDataChanges(previousSequencePath, previousListener);
  74. }
  75. }
  76. }
  77. }
  78. catch ( Exception e ) {
  79. //发生异常需要删除节点
  80. doDelete = true;
  81. throw e;
  82. } finally {
  83. //如果需要删除节点
  84. if ( doDelete ) {
  85. deleteOurPath(ourPath);
  86. }
  87. }
  88. return haveTheLock;
  89. }
  90. private String getLockNodeNumber(String str, String lockName) {
  91. int index = str.lastIndexOf(lockName);
  92. if ( index >= 0 ) {
  93. index += lockName.length();
  94. return index <= str.length() ? str.substring(index) : "";
  95. }
  96. return str;
  97. }
  98. // 获取/locker下的经过排序的子节点列表
  99. List<String> getSortedChildren() throws Exception {
  100. try{
  101. List<String> children = client.getChildren(basePath);
  102. Collections.sort(
  103. children, new Comparator<String>() {
  104. public int compare(String lhs, String rhs) {
  105. return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
  106. }
  107. }
  108. );
  109. return children;
  110. } catch (ZkNoNodeException e){
  111. client.createPersistent(basePath, true);
  112. return getSortedChildren();
  113. }
  114. }
  115. protected void releaseLock(String lockPath) throws Exception{
  116. deleteOurPath(lockPath);
  117. }
  118. protected String attemptLock(long time, TimeUnit unit) throws Exception {
  119. final long startMillis = System.currentTimeMillis();
  120. final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
  121. String ourPath = null;
  122. boolean hasTheLock = false;
  123. boolean isDone = false;
  124. int retryCount = 0;
  125. //网络闪断需要重试一试
  126. while ( !isDone ) {
  127. isDone = true;
  128. try {
  129. // 在/locker下创建临时的顺序节点
  130. ourPath = createLockNode(client, path);
  131. // 判断自己是否获得了锁,如果没有获得那么等待直到获得锁或者超时
  132. hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
  133. } catch ( ZkNoNodeException e ) { // 捕获这个异常
  134. if ( retryCount++ < MAX_RETRY_COUNT ) { // 重试指定次数
  135. isDone = false;
  136. } else {
  137. throw e;
  138. }
  139. }
  140. }
  141. if ( hasTheLock ) {
  142. return ourPath;
  143. }
  144. return null;
  145. }
  146. }
  1. public class TestDistributedLock {
  2. public static void main(String[] args) {
  3. final ZkClientExt zkClientExt1 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());
  4. final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(zkClientExt1, "/Mutex");
  5. final ZkClientExt zkClientExt2 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());
  6. final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(zkClientExt2, "/Mutex");
  7. try {
  8. mutex1.acquire();
  9. System.out.println("Client1 locked");
  10. Thread client2Thd = new Thread(new Runnable() {
  11. public void run() {
  12. try {
  13. mutex2.acquire();
  14. System.out.println("Client2 locked");
  15. mutex2.release();
  16. System.out.println("Client2 released lock");
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. });
  22. client2Thd.start();
  23. Thread.sleep(5000);
  24. mutex1.release();
  25. System.out.println("Client1 released lock");
  26. client2Thd.join();
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  1. public class ZkClientExt extends ZkClient {
  2. public ZkClientExt(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
  3. super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
  4. }
  5. @Override
  6. public void watchForData(final String path) {
  7. retryUntilConnected(new Callable<Object>() {
  8. public Object call() throws Exception {
  9. Stat stat = new Stat();
  10. _connection.readData(path, stat, true);
  11. return null;
  12. }
  13. });
  14. }
  15. }

5. 分布式队列

在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。

分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。

zookeeper可以通过顺序节点实现分布式队列。

架构图

Zookeeper使用案例 - 图14

图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。

流程图

offer核心算法流程

Zookeeper使用案例 - 图15

poll核心算法流程

Zookeeper使用案例 - 图16

代码实现

  1. /**
  2. * 简单分布式队列
  3. */
  4. public class DistributedSimpleQueue<T> {
  5. protected final ZkClient zkClient;
  6. // queue节点
  7. protected final String root;
  8. // 顺序节点前缀
  9. protected static final String Node_NAME = "n_";
  10. public DistributedSimpleQueue(ZkClient zkClient, String root) {
  11. this.zkClient = zkClient;
  12. this.root = root;
  13. }
  14. // 判断队列大小
  15. public int size() {
  16. return zkClient.getChildren(root).size();
  17. }
  18. // 判断队列是否为空
  19. public boolean isEmpty() {
  20. return zkClient.getChildren(root).size() == 0;
  21. }
  22. // 向队列提供数据
  23. public boolean offer(T element) throws Exception{
  24. // 创建顺序节点
  25. String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
  26. try {
  27. zkClient.createPersistentSequential(nodeFullPath , element);
  28. }catch (ZkNoNodeException e) {
  29. zkClient.createPersistent(root);
  30. offer(element);
  31. } catch (Exception e) {
  32. throw ExceptionUtil.convertToRuntimeException(e);
  33. }
  34. return true;
  35. }
  36. // 从队列取数据
  37. public T poll() throws Exception {
  38. try {
  39. // 获取所有顺序节点
  40. List<String> list = zkClient.getChildren(root);
  41. if (list.size() == 0) {
  42. return null;
  43. }
  44. // 排序
  45. Collections.sort(list, new Comparator<String>() {
  46. public int compare(String lhs, String rhs) {
  47. return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
  48. }
  49. });
  50. // 循环每个顺序节点名
  51. for ( String nodeName : list ){
  52. // 构造出顺序节点的完整路径
  53. String nodeFullPath = root.concat("/").concat(nodeName);
  54. try {
  55. // 读取顺序节点的内容
  56. T node = (T) zkClient.readData(nodeFullPath);
  57. // 删除顺序节点
  58. zkClient.delete(nodeFullPath);
  59. return node;
  60. } catch (ZkNoNodeException e) {
  61. // ignore 由其他客户端把这个顺序节点消费掉了
  62. }
  63. }
  64. return null;
  65. } catch (Exception e) {
  66. throw ExceptionUtil.convertToRuntimeException(e);
  67. }
  68. }
  69. private String getNodeNumber(String str, String nodeName) {
  70. int index = str.lastIndexOf(nodeName);
  71. if (index >= 0) {
  72. index += Node_NAME.length();
  73. return index <= str.length() ? str.substring(index) : "";
  74. }
  75. return str;
  76. }
  77. }
  1. public class User implements Serializable {
  2. String name;
  3. String id;
  4. public String getName() {
  5. return name;
  6. }
  7. public void setName(String name) {
  8. this.name = name;
  9. }
  10. public String getId() {
  11. return id;
  12. }
  13. public void setId(String id) {
  14. this.id = id;
  15. }
  16. }
  1. public class TestDistributedSimpleQueue {
  2. public static void main(String[] args) {
  3. ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
  4. DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");
  5. User user1 = new User();
  6. user1.setId("1");
  7. user1.setName("xiao wang");
  8. User user2 = new User();
  9. user2.setId("2");
  10. user2.setName("xiao wang");
  11. try {
  12. queue.offer(user1);
  13. queue.offer(user2);
  14. User u1 = (User) queue.poll();
  15. User u2 = (User) queue.poll();
  16. if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
  17. System.out.println("Success!");
  18. }
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }

上面实现了一个简单分布式队列,在此基础上,我们再扩展一个阻塞分布式队列。代码如下:

  1. /**
  2. * 阻塞分布式队列
  3. * 扩展自简单分布式队列,在拿不到队列数据时,进行阻塞直到拿到数据
  4. */
  5. public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{
  6. public DistributedBlockingQueue(ZkClient zkClient, String root) {
  7. super(zkClient, root);
  8. }
  9. @Override
  10. public T poll() throws Exception {
  11. while (true){ // 结束在latch上的等待后,再来一次
  12. final CountDownLatch latch = new CountDownLatch(1);
  13. final IZkChildListener childListener = new IZkChildListener() {
  14. public void handleChildChange(String parentPath, List<String> currentChilds)
  15. throws Exception {
  16. latch.countDown(); // 队列有变化,结束latch上的等待
  17. }
  18. };
  19. zkClient.subscribeChildChanges(root, childListener);
  20. try{
  21. T node = super.poll(); // 获取队列数据
  22. if ( node != null ){
  23. return node;
  24. } else {
  25. latch.await(); // 拿不到队列数据,则在latch上await
  26. }
  27. } finally {
  28. zkClient.unsubscribeChildChanges(root, childListener);
  29. }
  30. }
  31. }
  32. }
  1. public class TestDistributedBlockingQueue {
  2. public static void main(String[] args) {
  3. ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
  4. int delayTime = 5;
  5. ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
  6. final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue");
  7. final User user1 = new User();
  8. user1.setId("1");
  9. user1.setName("xiao wang");
  10. final User user2 = new User();
  11. user2.setId("2");
  12. user2.setName("xiao wang");
  13. try {
  14. delayExector.schedule(new Runnable() {
  15. public void run() {
  16. try {
  17. queue.offer(user1);
  18. queue.offer(user2);
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }, delayTime , TimeUnit.SECONDS);
  24. System.out.println("ready poll!");
  25. User u1 = (User) queue.poll();
  26. User u2 = (User) queue.poll();
  27. if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
  28. System.out.println("Success!");
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally{
  33. delayExector.shutdown();
  34. try {
  35. delayExector.awaitTermination(2, TimeUnit.SECONDS);
  36. } catch (InterruptedException e) {
  37. }
  38. }
  39. }
  40. }

6. 分布式命名服务

zookeeper的命名服务有两个应用方向,一个是提供类似JNDI的功能,利用zookeepeer的树型分层结构,可以把系统中各种服务的名称、地址以及目录信息存放在zookeeper,需要的时候去zookeeper中读取。

另一个,是利用zookeeper顺序节点的特性,制作分布式的ID生成器,写过数据库应用的朋友都知道,我们在往数据库表中插入记录时,通常需要为该记录创建唯一的ID,在单机环境中我们可以利用数据库的主键自增功能。但在分布式环境则无法使用,有一种方式可以使用UUID,但是它的缺陷是没有规律,很难理解。利用zookeeper顺序节点的特性,我们可以生成有顺序的,容易理解的,同时支持分布式环境的序列号。

代码实现

  1. public class IdMaker {
  2. private ZkClient client = null;
  3. private final String server;
  4. // zookeeper顺序节点的父节点
  5. private final String root;
  6. // 顺序节点的名称
  7. private final String nodeName;
  8. // 标识当前服务是否正在运行
  9. private volatile boolean running = false;
  10. private ExecutorService cleanExector = null;
  11. public enum RemoveMethod{
  12. NONE,IMMEDIATELY,DELAY
  13. }
  14. public IdMaker(String zkServer,String root,String nodeName){
  15. this.root = root;
  16. this.server = zkServer;
  17. this.nodeName = nodeName;
  18. }
  19. // 启动服务
  20. public void start() throws Exception {
  21. if (running)
  22. throw new Exception("server has stated...");
  23. running = true;
  24. init();
  25. }
  26. // 停止服务
  27. public void stop() throws Exception {
  28. if (!running)
  29. throw new Exception("server has stopped...");
  30. running = false;
  31. freeResource();
  32. }
  33. // 初始化服务资源
  34. private void init(){
  35. client = new ZkClient(server,5000,5000,new BytesPushThroughSerializer());
  36. cleanExector = Executors.newFixedThreadPool(10);
  37. try{
  38. client.createPersistent(root,true);
  39. }catch (ZkNodeExistsException e){
  40. //ignore;
  41. }
  42. }
  43. // 释放服务器资源
  44. private void freeResource(){
  45. // 释放线程池
  46. cleanExector.shutdown();
  47. try{
  48. cleanExector.awaitTermination(2, TimeUnit.SECONDS);
  49. }catch(InterruptedException e){
  50. e.printStackTrace();
  51. }finally{
  52. cleanExector = null;
  53. }
  54. if (client!=null){
  55. client.close();
  56. client=null;
  57. }
  58. }
  59. // 检测当前服务是否正在运行
  60. private void checkRunning() throws Exception {
  61. if (!running)
  62. throw new Exception("请先调用start");
  63. }
  64. // 从顺序节点名中提取我们要的ID值
  65. private String ExtractId(String str){
  66. int index = str.lastIndexOf(nodeName);
  67. if (index >= 0){
  68. index+=nodeName.length();
  69. return index <= str.length()?str.substring(index):"";
  70. }
  71. return str;
  72. }
  73. // 生成ID
  74. public String generateId(RemoveMethod removeMethod) throws Exception{
  75. checkRunning();
  76. // 构造顺序节点的完整路径
  77. final String fullNodePath = root.concat("/").concat(nodeName);
  78. // 创建持久化顺序节点
  79. final String ourPath = client.createPersistentSequential(fullNodePath, null);
  80. // 避免zookeeper的顺序节点暴增,直接删除掉刚创建的顺序节点
  81. if (removeMethod.equals(RemoveMethod.IMMEDIATELY)){ // 立即删除
  82. client.delete(ourPath);
  83. }else if (removeMethod.equals(RemoveMethod.DELAY)){ // 延迟删除
  84. cleanExector.execute(new Runnable() { // 用线程池执行删除,让generateId()方法尽快返回
  85. public void run() {
  86. client.delete(ourPath);
  87. }
  88. });
  89. }
  90. //node-0000000000, node-0000000001
  91. return ExtractId(ourPath);
  92. }
  93. }
  1. public class TestIdMaker {
  2. public static void main(String[] args) throws Exception {
  3. IdMaker idMaker = new IdMaker("192.168.1.105:2181",
  4. "/NameService/IdGen", "ID");
  5. idMaker.start();
  6. try {
  7. for (int i = 0; i < 10; i++) {
  8. String id = idMaker.generateId(RemoveMethod.DELAY);
  9. System.out.println(id);
  10. }
  11. } finally {
  12. idMaker.stop();
  13. }
  14. }
  15. }