微服务和分布式

分布式

一种系统部署方式,说白了就是把整个服务拆开然后部署在多台服务器上,来提高服务的性能,满足客户需要。
水平拆分
水平拆分就是分层,把dao层和service层布置在A服务器上,然后把controller层布置在B服务器上,然后通过Dubbo等RPC进行整合,以此来完成整个项目的功能。
垂直拆分
垂直拆分是指按模块分,比如销售模块、仓库模块等分开,然后部署在不同的服务器上

微服务

一种架构设计方式
所谓微服务,就是每个单体服务(在分布式的模块上继续拆分,比分布式更细)只做单一、简单的功能,这个服务可以单独部署运行,服务之间通过RPC来相互交互,每个微服务都是由独立的团队开发、测试、部署、上线。

微服务可以布置在一台服务器上,也可以布置在多台服务器上,然后互相调用协作完成功能。

Zookeeper简介

Zookeeper用来注册服务和进行负载均衡,简单来说就是既然进行了分布式部署,那得有个东西来指路,调用者得请求得能找到对应处理的服务器,简单来说就是IP地址和服务名称的对应关系,也可以在如软件层面硬编码实现,但是如果服务器挂掉就没办法知道了,zookeeper就是提供这一套可以更加便捷的管理分布式系统,比如说服务器上下线的通知、负载均衡、统一配置管理等。
主要是包括文件系统通知机制

文件系统

存储节点 数据
类似于Unix文件系统的树形结构,每个节点称作一个ZNode,1MB默认大小

节点类型

临时有序(可以用来实现分布式锁)、临时无序
永久有序、永久无序

通知机制

主要是对服务器或者客户端进行通知,并且监听其状态

Zookeeper特性

  • 一个leader,多个follower的集群
  • 集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器
  • 全局数据一致,每个服务器都保存同样的数据,实时更新
  • 更新的请求顺序保持顺序(来自同一个服务器)
  • 数据更新的原子性,数据要么成功要么失败
  • 数据实时更新性很快

    应用场景

  • 统一命名服务(域名服务)

  • 统一配置管理(一个集群中的所有配置都一致,且也要实时更新同步)
  • 将配置信息写入ZooKeeper上的一个Znode,各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器
  • 统一集群管理(掌握实时状态)
  • 将节点信息写入ZooKeeper上的一个ZNode。监听ZNode获取实时状态变化
  • 服务器节点动态上下线
  • 软负载均衡(根据每个节点的访问数,让访问数最少的服务器处理最新的数据需求)

    安装步骤安装过程.txt

    zookeeper的选举

zookeeper运行机制

集群数量

至少需要三台服务器,因为是半数运行机制,向上取整,即1.5——2,2.5—3,如果是挂掉1台服务器,相当于没挂掉,挂掉两台才挂掉,如果选择四台的话,挂掉两台服务器就崩掉了,相当于第四台没有作用。

配置语法

  • server.<节点ID>=:<数据同步端口>:<选举端口>
  • 节点ID:1-125之间的数字 写在对应服务器节点的myid文件里
  • 数据同步端口:主从同步时数据传输的端口
  • 选举端口:选举leader节点de 端口
  • 集群配置image.png
    在zoo.cfg配置
    1. - dataDir对应的路径里有一个myid的文件<br />存储123 节点ID

    角色

    leader

    主节点 领导者,拥有投票权

    follower

    子节点 追随者
    拥有投票权

    observer

    次级子节点 观察者
    配置的话要在最后的3888端口后面加:observer
    没有投票权 一般情况下项目里是没有的 只有在集群压力过大时 才会上线

    查看角色

    ./bin/zkServer.sh status conf/zoo.cfg

    初始简单选举

    第一轮投票:

    每个节点都投票给自己,假设A B C三台服务器zookeeper先后启动,首先A启动,给自己投一票,B启动也给自己投一票,现在状态是C未启动,A B两台服务器各一票

    第二轮投票:

    每个节点投票给大于自己myid的,然后直接选举,大于当前节点总数的半数就当选
    A和B对比 发现同票 然后投给大于自己的 即A投给B,这时候B两票,已经等于半数了,所以B当选Leader,若C在再启动,那么C也是只能做Follower

    数据同步机制image.png

    **leader 接收到写请求,然后同步给各个子节点

**

  • follower接收到请求,返回给Leader

  • Leader广播proposal请求给follower

  • follower回复ACK信息给leader

  • leader接收到半数以上的ACK认为请求处理完成然后就提交

    宕机后的数据同步

    ZXID事务

  • ZXID是一个64位长度的数字,低32位每次数据变更,就简单加1,高32位是按leader周期来编号的

  • 每次leader变更时,就从本地事务日志取出ZXID,然后将高32位解析出来加1,低32位全部置零,重新开始
  • 保证ZXID的唯一性 和递增

    宕机后的选举:

    假设12345,此时3为leader且宕机,这个时候选出4为leader,然后执行了一些数据操作,这个时候3重启了,要进行重新选举,12345,那么还是4会当选为leader,3是不会参与选举的,因为它的ZXID小。

    数据同步:

    假设12345,此时2为leader且宕机,这个时候选出3为leader,然后执行了一些数据操作,这个时候2重启了,会发现自身ZXID和leader的相比较旧,就会与其他四台进行数据的同步

    分布式锁的实现

    原生JavaAPI实现


    简述:主要是依靠 zookeeper的节点,当一个客户端创建一个节点时,另外一个客户端无法创建同名的节点(相当于互斥),且临时节点会在结束后自动删除,不影响使用。
    A创建lock临时节点,然后看是否成功如果,当前只有这个lock节点那就成功获取了锁,要不然的话就继续监听自己的上一个节点,然后继续等待到能获取锁的时候。 ```java package case1;

import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;

import java.io.FileInputStream; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch;

/**

  • @author ljp
  • @date 12/2/2021 */ public class DistributedLock { // Properties zoo=new Properties(); // FileInputStream zoos=new FileInputStream(“zoo.properties”); private final String connectString = “192.168.188.130:2181,192.168.188.129:2181,192.168.188.128:2181”; private final int sessionTimeout = 2000; private final ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath; private String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {

    1. // 获取连接
    2. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    3. @Override
    4. public void process(WatchedEvent watchedEvent) {
    5. // connectLatch 如果连接上zk 可以释放
    6. if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){
    7. connectLatch.countDown();
    8. }
    9. // waitLatch 需要释放
    10. if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
    11. waitLatch.countDown();
    12. }
    13. }
    14. });
    15. // 等待zk正常连接后,往下走程序
    16. connectLatch.await();
    17. // 判断根节点/locks是否存在
    18. Stat stat = zk.exists("/locks", false);
    19. if (stat == null) {
    20. // 创建一下根节点
    21. zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    22. }

    }

    // 对zk加锁 public void zklock() {

    1. // 创建对应的临时带序号节点
    2. try {
    3. currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    4. // wait一小会, 让结果更清晰一些
    5. Thread.sleep(10);
    6. // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
    7. List<String> children = zk.getChildren("/locks", false);
    8. // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
    9. if (children.size() == 1) {
    10. return;
    11. } else {
    12. Collections.sort(children);
    13. // 获取节点名称 seq-00000000
    14. String thisNode = currentMode.substring("/locks/".length());
    15. // 通过seq-00000000获取该节点在children集合的位置
    16. int index = children.indexOf(thisNode);
    17. // 判断
    18. if (index == -1) {
    19. System.out.println("数据异常");
    20. } else if (index == 0) {
    21. // 就一个节点,可以获取锁了
    22. return;
    23. } else {
    24. // 需要监听 他前一个节点变化
    25. waitPath = "/locks/" + children.get(index - 1);
    26. zk.getData(waitPath,true,new Stat());
    27. // 等待监听
    28. waitLatch.await();
    29. return;
    30. }
    31. }
  1. } catch (KeeperException e) {
  2. e.printStackTrace();
  3. } catch (InterruptedException e) {
  4. e.printStackTrace();
  5. }
  6. }
  7. // 解锁
  8. public void unZkLock() {
  9. // 删除节点
  10. try {
  11. zk.delete(this.currentMode,-1);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } catch (KeeperException e) {
  15. e.printStackTrace();
  16. }
  17. }

}

  1. ```java
  2. package case1;
  3. import org.apache.zookeeper.KeeperException;
  4. import java.io.IOException;
  5. /**
  6. * @author ljp
  7. * @date 12/2/2021
  8. */
  9. public class DistributedLockTest {
  10. public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
  11. final DistributedLock lock1 = new DistributedLock();
  12. final DistributedLock lock2 = new DistributedLock();
  13. new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. lock1.zklock();
  18. System.out.println("线程1 启动,获取到锁");
  19. Thread.sleep(5 * 1000);
  20. lock1.unZkLock();
  21. System.out.println("线程1 释放锁");
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }).start();
  27. new Thread(new Runnable() {
  28. @Override
  29. public void run() {
  30. try {
  31. lock2.zklock();
  32. System.out.println("线程2 启动,获取到锁");
  33. Thread.sleep(5 * 1000);
  34. lock2.unZkLock();
  35. System.out.println("线程2 释放锁");
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }).start();
  41. System.out.println("main线程 ");
  42. }
  43. }

curator分布式锁

注意:安装的zookeeper版本需和pom.xml对应,然后execlusion 掉curator的zookeeper

原生的 Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归

采用curator开源项目,即可以简单实现分布式锁,主要就是创建一个curator客户端,然后启动客户端,也就是封装好的zookeeper,然后acquire()获取锁和release()释放锁,方法都封装好了,使用非常简单。

  1. package case2;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. public class CuratorLockTest {
  7. public static void main(String[] args) {
  8. // 创建分布式锁1
  9. InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
  10. // 创建分布式锁2
  11. InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. lock1.acquire();
  17. System.out.println("线程1 获取到锁");
  18. lock1.acquire();
  19. System.out.println("线程1 再次获取到锁");
  20. Thread.sleep(5 * 1000);
  21. lock1.release();
  22. System.out.println("线程1 释放锁");
  23. lock1.release();
  24. System.out.println("线程1 再次释放锁");
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }).start();
  30. new Thread(new Runnable() {
  31. @Override
  32. public void run() {
  33. try {
  34. lock2.acquire();
  35. System.out.println("线程2 获取到锁");
  36. lock2.acquire();
  37. System.out.println("线程2 再次获取到锁");
  38. Thread.sleep(5 * 1000);
  39. lock2.release();
  40. System.out.println("线程2 释放锁");
  41. lock2.release();
  42. System.out.println("线程2 再次释放锁");
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }).start();
  48. }
  49. private static CuratorFramework getCuratorFramework() {
  50. ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
  51. CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.188.128:2181,192.168.188.129:2181,192.168.188.130:2181")
  52. .connectionTimeoutMs(2000)
  53. .sessionTimeoutMs(2000)
  54. .retryPolicy(policy).build();
  55. // 启动客户端
  56. client.start();
  57. System.out.println("zookeeper 启动成功");
  58. return client;
  59. }
  60. }