在分布式架构中,由于不同的机器(进程)会同时执行同一个业务逻辑,但是不同于单机模式,可以使用 sync 或 lock 来实现对线程的控制,而分布式锁,就是控制进程执行顺序的
MySQL 分布式锁
使用 for update 语句,将行锁住,实现分布式锁
select count(1) from distributed_lock where lock_name = 'LOCK' for update
- 在表中没有数据的情况下,我们先看一下 explain 上述语句的结果:

再看一下查询优化器优化上述语句的结果
select count(1) AS `count(1)` from `test`.`distributed_lock` `t` where (NULL = 'LOCK')
- 在表中只有一条数据的情况下,我们先看一下 explain 上述语句的结果:

再看一下查询优化器优化上述语句的结果
select count(1) AS `count(1)` from `test`.`distributed_lock` `t` where 1
我们可以看出,LOCK_NAME 虽然最了唯一索引,但是执行上述语句的时候,MySQL 的查询优化器并没有使用索引,而是使用了常量,而 for update 语句,一旦没有使用索引,那么 for update 就不会生效(也就是不能锁住行),那么这种实现就是有问题的,只有在数据量比较大的时候,才会使用到索引,那么这种实现的弊端就是,需要一张有很多数据的表
Redis 分布式锁
Redis 分布式锁,主要是保证原子性的操作,同时要保证 key 在业务期间不能过期
- ex(expire):设置键的过期时间
- nx(set if not exists):只在键不存在时,才对键进行设置操作
@Overridepublic boolean tryLock() {Jedis jedis = jedisPool.getResource();SetParams params = new SetParams();params.ex(5); // 5s 后会过期params.nx(); // 如果 key 不存在,则创建成功,否则,创建失败String value = UUID.randomUUID().toString();// 保证了原子性,将设置过期时间和设置值作为一步操作String result = jedis.set(LOCK_KEY, value, params);jedis.close();if ("OK".equals(result)) {// 拿到了锁threadLocal.set(value);// 启动一个线程,保证 key 在业务期间不过期if (isHappened.get()) {Thread thread = new Thread(new MyRunnable(jedisPool));thread.setDaemon(true);thread.start();isHappened.set(false);}return true;}return false;}@Overridepublic void lock() {if (!tryLock()) {lock();}}@Overridepublic void unlock() {// 这里使用 lua 脚本,保证原子性操作String script = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +" return redis.call(\"del\", KEYS[1])\n" +"else\n" +" return 0\n" +"end";Jedis jedis = jedisPool.getResource();Object eval = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(threadLocal.get()));if (Integer.parseInt(eval.toString()) == 0) {jedis.close();}}
ZooKeeper
ZooKeeper是一个集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。
- 数据发布与订阅(配置中心)
- 负载均衡
- 命名服务(Naming Service)
- 分布式通知 / 协调
- 集群管理与 Master 选举
- 分布式锁
- 分布式队列
分布式锁
核心是:临时节点 + 顺序节点 + Watch,每个服务器建立自己的临时节点,然后寻找所有顺序节点并排序,找出最小值,如果最小值就是自己维护的节点的话,就相当于得到了锁,如果不是的话,就阻塞,等待锁被释放,然后继续重复上述步骤

public class ZkLock implements Lock {
private static final String LOCK_NAME = "/LOCK";
private ThreadLocal<ZooKeeper> zk = new ThreadLocal<>();
private ThreadLocal<String> currentNodeName = new ThreadLocal<>();
public void init() {
try {
if (zk.get() == null)
zk.set(new ZooKeeper("localhost:2181", 5000, null));
Stat stat = zk.get().exists(LOCK_NAME, false);
if (stat == null) {
zk.get().create(LOCK_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean tryLock() {
init();
String nodeName = LOCK_NAME + "/zk_";
try {
// /LOCK/zk_1
currentNodeName.set(zk.get().create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
List<String> children = zk.get().getChildren(LOCK_NAME, false); // zk_1, zk_2, ...
Collections.sort(children);
String minNodeName = children.get(0);
if (currentNodeName.get().equals(LOCK_NAME + "/" + minNodeName)) {
return true;
}
int currentNodeIndex = children.indexOf(currentNodeName.get().substring(currentNodeName.get().lastIndexOf("/") + 1));
String prevNodeName = children.get(currentNodeIndex - 1);
CountDownLatch countDownLatch = new CountDownLatch(1);
zk.get().exists(LOCK_NAME + "/" + prevNodeName, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.EventType.NodeDeleted.equals(watchedEvent.getType())) {
countDownLatch.countDown();
System.out.println("【" + Thread.currentThread().getName() + "】被唤醒...");
}
}
});
System.out.println("【" + Thread.currentThread().getName() + "】等待锁...");
countDownLatch.await();
return true;
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
lock();
}
}
@Override
public void unlock() {
try {
zk.get().delete(currentNodeName.get(), -1);
currentNodeName.set(null);
zk.get().close();
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
分布式配置中心
核心思路:持久节点 + Watch
public class Config {
private Map<String, String> cache = new HashMap<>();
private CuratorFramework client;
private static final String CONFIG_PREFIX = "/CONFIG";
public Config() {
this.client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 5000));
this.client.start();
init();
}
public void init() {
try {
List<String> childrenNames = client.getChildren().forPath(CONFIG_PREFIX);
for (String childrenName : childrenNames) {
String value = new String(client.getData().forPath(CONFIG_PREFIX + "/" + childrenName));
cache.put(childrenName, value);
}
PathChildrenCache watcher = new PathChildrenCache(client, CONFIG_PREFIX, true);
watcher.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
String path = event.getData().getPath();
if (path.startsWith(CONFIG_PREFIX)) {
String name = path.replace(CONFIG_PREFIX + "/", "");
if (PathChildrenCacheEvent.Type.CHILD_ADDED.equals(event.getType()) ||
PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(event.getType())) {
cache.put(name, new String(event.getData().getData()));
} else if (PathChildrenCacheEvent.Type.CHILD_REMOVED.equals(event.getType())) {
cache.remove(name);
}
}
}
});
watcher.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void save(String name, String value) {
try {
String configFullPath = CONFIG_PREFIX + "/" + name;
Stat stat = client.checkExists().forPath(configFullPath);
if (stat == null) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configFullPath, value.getBytes());
} else {
client.setData().forPath(configFullPath, value.getBytes());
}
cache.put(name, value);
} catch (Exception e) {
e.printStackTrace();
}
}
public String get(String name) {
return cache.get(name);
}
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.save("port", "8080");
while (true) {
System.out.println(config.get("port"));
TimeUnit.SECONDS.sleep(3);
}
}
}
