Zookeeper 非公平锁/公平锁/共享锁

    Zookeeper分布式锁实战
    Zookeeper 分布式锁加锁原理
    image.png
    如上实现方式在并发问题比较严重的情况下,性能会下降的比较厉害,主要原因是,所有的连接都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争,这就是羊群效应。这种加锁方式是非公平锁的具体实现:如何避免呢,我们看下面这种方式。
    image.png
    如上借助于临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁,是公平锁的具体实现。
    前面这两种加锁方式有一个共同的特质,就是都是互斥锁,同一时间只能有一个请求占用,如果是大量的并发上来,性能是会急剧下降的,所有的请求都得加锁,那是不是真的所有的请求都需要加锁呢?答案是否定的,比如如果数据没有进行任何修改的话,是不需要加锁的,但是如果读数据的请求还没读完,这个时候来了一个写请求,怎么办呢?有人已经在读数据了,这个时候是不能写数据的,不然数据就不正确了。直到前面读锁全部释放掉以后,写请求才能执行,所以需要给这个读请求加一个标识(读锁),让写请求知道,这个时候是不能修改数据的。不然数据就不一致了。如果已经有人在写数据了,再来一个请求写数据,也是不允许的,这样也会导致数据的不一致,所以所有的写请求,都需要加一个写锁,是为了避免同时对共享数据进行写操作。
    举个例子
    1、读写并发不一致
    image.png
    2、双写不一致情况
    image.png
    Zookeeper 共享锁实现原理
    image.png
    下面分别用mysql代码和zookeeper代码实现并发抢商品场景:
    配置

    1. server.port=8080
    2. spring.datasource.url=jdbc:mysql://localhost:3306/pro?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true
    3. spring.datasource.username=root
    4. spring.datasource.password=root
    5. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    6. mybatis.configuration.map-underscore-to-camel-case=true

    xml配置

    1. <dependencies>
    2. <dependency>
    3. <groupId>mysql</groupId>
    4. <artifactId>mysql-connector-java</artifactId>
    5. <version>5.1.34</version>
    6. <scope>runtime</scope>
    7. </dependency>
    8. <dependency>
    9. <groupId>org.springframework.boot</groupId>
    10. <artifactId>spring-boot-starter-web</artifactId>
    11. </dependency>
    12. <dependency>
    13. <groupId>org.mybatis.spring.boot</groupId>
    14. <artifactId>mybatis-spring-boot-starter</artifactId>
    15. <version>2.1.1</version>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.springframework.boot</groupId>
    19. <artifactId>spring-boot-starter-test</artifactId>
    20. <scope>test</scope>
    21. <exclusions>
    22. <exclusion>
    23. <groupId>org.junit.vintage</groupId>
    24. <artifactId>junit-vintage-engine</artifactId>
    25. </exclusion>
    26. </exclusions>
    27. </dependency>
    28. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
    29. <dependency>
    30. <groupId>org.apache.curator</groupId>
    31. <artifactId>curator-recipes</artifactId>
    32. <version>5.0.0</version>
    33. <exclusions>
    34. <exclusion>
    35. <groupId>org.apache.zookeeper</groupId>
    36. <artifactId>zookeeper</artifactId>
    37. </exclusion>
    38. </exclusions>
    39. </dependency>
    40. <dependency>
    41. <groupId>org.apache.zookeeper</groupId>
    42. <artifactId>zookeeper</artifactId>
    43. <version>3.5.8</version>
    44. </dependency>
    45. </dependencies>

    mapper:

    1. public interface ProductMapper {
    2. @Select(" select * from product where id=#{id} ")
    3. Product getProduct(@Param("id") Integer id);
    4. @Update(" update product set stock=stock-1 where id=#{id} ")
    5. int deductStock(@Param("id") Integer id);
    6. }

    实体类:

    1. public class Product {
    2. private Integer id;
    3. private String productName;
    4. private Integer stock;
    5. private Integer version;
    6. public Integer getId() {
    7. return id;
    8. }
    9. public void setId(Integer id) {
    10. this.id = id;
    11. }
    12. public String getProductName() {
    13. return productName;
    14. }
    15. public void setProductName(String productName) {
    16. this.productName = productName;
    17. }
    18. public Integer getStock() {
    19. return stock;
    20. }
    21. public void setStock(Integer stock) {
    22. this.stock = stock;
    23. }
    24. public Integer getVersion() {
    25. return version;
    26. }
    27. public void setVersion(Integer version) {
    28. this.version = version;
    29. }
    30. }

    1.springboot+mysql抢优惠商品代码:

    1. //场景:优惠活动送5个春节大礼包。
    2. @Service
    3. public class OrderService {
    4. @Autowired
    5. private ProductMapper productMapper;
    6. @Autowired
    7. private OrderMapper orderMapper;
    8. @Transactional
    9. public void reduceStock(Integer id) {
    10. // 1. 获取库存
    11. Product product = productMapper.getProduct(id);
    12. // 模拟耗时业务处理
    13. sleep(500); // 其他业务处理
    14. if (product.getStock() <= 0) {
    15. throw new RuntimeException("out of stock");
    16. }
    17. // 2. 减库存
    18. int i = productMapper.deductStock(id);
    19. if (i == 1) {
    20. Order order = new Order();
    21. order.setUserId(UUID.randomUUID().toString());
    22. order.setPid(id);
    23. orderMapper.insert(order);
    24. } else {
    25. throw new RuntimeException("deduct stock fail, retry.");
    26. }
    27. }
    28. /**
    29. * 模拟耗时业务处理
    30. *
    31. * @param wait
    32. */
    33. public void sleep(long wait) {
    34. try {
    35. TimeUnit.MILLISECONDS.sleep(wait);
    36. } catch (InterruptedException e) {
    37. e.printStackTrace();
    38. }
    39. }
    40. }

    分别启动了两个实例8000和8001,并用nginx进行代理转发,客户端统一请求到nginx 80端口。
    表product设置了5个库存,用JMeter并发发送了10个请求,结果product表的stock为-5,超出了库存数量,出现超卖问题,不符合预期。

    2.zookeeper抢优惠商品分布式锁代码:

    1. @Configuration
    2. public class CuratorCfg {
    3. @Bean(initMethod = "start")
    4. public CuratorFramework curatorFramework(){
    5. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    6. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.109.200:2181", retryPolicy);
    7. return client;
    8. }
    9. }
    1. //场景:优惠活动送5个春节大礼包。
    2. @RestController
    3. public class TestController {
    4. @Autowired
    5. private OrderService orderService;
    6. @Value("${server.port}")
    7. private String port;
    8. @Autowired
    9. CuratorFramework curatorFramework;
    10. @PostMapping("/stock/deduct")
    11. public Object reduceStock(Integer id) throws Exception {
    12. InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_" + id);
    13. try {
    14. // ...
    15. interProcessMutex.acquire();
    16. orderService.reduceStock(id);
    17. } catch (Exception e) {
    18. if (e instanceof RuntimeException) {
    19. throw e;
    20. }
    21. } finally {
    22. interProcessMutex.release();
    23. }
    24. return "ok:" + port;
    25. }
    26. }

    分别启动两个实例,用nginx进行代理转发,客户端统一请求到nginx
    表product设置了5个库存,用JMeter并发发送了100个请求,结果只有5个请求抢到商品并添加到订单记录的,符合预期