Zookeeper 非公平锁/公平锁/共享锁
Zookeeper分布式锁实战
Zookeeper 分布式锁加锁原理
如上实现方式在并发问题比较严重的情况下,性能会下降的比较厉害,主要原因是,所有的连接都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争,这就是羊群效应。这种加锁方式是非公平锁的具体实现:如何避免呢,我们看下面这种方式。
如上借助于临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁,是公平锁的具体实现。
前面这两种加锁方式有一个共同的特质,就是都是互斥锁,同一时间只能有一个请求占用,如果是大量的并发上来,性能是会急剧下降的,所有的请求都得加锁,那是不是真的所有的请求都需要加锁呢?答案是否定的,比如如果数据没有进行任何修改的话,是不需要加锁的,但是如果读数据的请求还没读完,这个时候来了一个写请求,怎么办呢?有人已经在读数据了,这个时候是不能写数据的,不然数据就不正确了。直到前面读锁全部释放掉以后,写请求才能执行,所以需要给这个读请求加一个标识(读锁),让写请求知道,这个时候是不能修改数据的。不然数据就不一致了。如果已经有人在写数据了,再来一个请求写数据,也是不允许的,这样也会导致数据的不一致,所以所有的写请求,都需要加一个写锁,是为了避免同时对共享数据进行写操作。
举个例子
1、读写并发不一致
2、双写不一致情况
Zookeeper 共享锁实现原理
下面分别用mysql代码和zookeeper代码实现并发抢商品场景:
配置
server.port=8080
spring.datasource.url=jdbc:mysql://localhost:3306/pro?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
mybatis.configuration.map-underscore-to-camel-case=true
xml配置
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
</dependencies>
mapper:
public interface ProductMapper {
@Select(" select * from product where id=#{id} ")
Product getProduct(@Param("id") Integer id);
@Update(" update product set stock=stock-1 where id=#{id} ")
int deductStock(@Param("id") Integer id);
}
实体类:
public class Product {
private Integer id;
private String productName;
private Integer stock;
private Integer version;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Integer getStock() {
return stock;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
}
1.springboot+mysql抢优惠商品代码:
//场景:优惠活动送5个春节大礼包。
@Service
public class OrderService {
@Autowired
private ProductMapper productMapper;
@Autowired
private OrderMapper orderMapper;
@Transactional
public void reduceStock(Integer id) {
// 1. 获取库存
Product product = productMapper.getProduct(id);
// 模拟耗时业务处理
sleep(500); // 其他业务处理
if (product.getStock() <= 0) {
throw new RuntimeException("out of stock");
}
// 2. 减库存
int i = productMapper.deductStock(id);
if (i == 1) {
Order order = new Order();
order.setUserId(UUID.randomUUID().toString());
order.setPid(id);
orderMapper.insert(order);
} else {
throw new RuntimeException("deduct stock fail, retry.");
}
}
/**
* 模拟耗时业务处理
*
* @param wait
*/
public void sleep(long wait) {
try {
TimeUnit.MILLISECONDS.sleep(wait);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
分别启动了两个实例8000和8001,并用nginx进行代理转发,客户端统一请求到nginx 80端口。
表product设置了5个库存,用JMeter并发发送了10个请求,结果product表的stock为-5,超出了库存数量,出现超卖问题,不符合预期。
2.zookeeper抢优惠商品分布式锁代码:
@Configuration
public class CuratorCfg {
@Bean(initMethod = "start")
public CuratorFramework curatorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.109.200:2181", retryPolicy);
return client;
}
}
//场景:优惠活动送5个春节大礼包。
@RestController
public class TestController {
@Autowired
private OrderService orderService;
@Value("${server.port}")
private String port;
@Autowired
CuratorFramework curatorFramework;
@PostMapping("/stock/deduct")
public Object reduceStock(Integer id) throws Exception {
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_" + id);
try {
// ...
interProcessMutex.acquire();
orderService.reduceStock(id);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw e;
}
} finally {
interProcessMutex.release();
}
return "ok:" + port;
}
}
分别启动两个实例,用nginx进行代理转发,客户端统一请求到nginx
表product设置了5个库存,用JMeter并发发送了100个请求,结果只有5个请求抢到商品并添加到订单记录的,符合预期