微服务和分布式
分布式
一种系统部署方式,说白了就是把整个服务拆开然后部署在多台服务器上,来提高服务的性能,满足客户需要。
水平拆分
水平拆分就是分层,把dao层和service层布置在A服务器上,然后把controller层布置在B服务器上,然后通过Dubbo等RPC进行整合,以此来完成整个项目的功能。
垂直拆分
垂直拆分是指按模块分,比如销售模块、仓库模块等分开,然后部署在不同的服务器上
微服务
一种架构设计方式
所谓微服务,就是每个单体服务(在分布式的模块上继续拆分,比分布式更细)只做单一、简单的功能,这个服务可以单独部署运行,服务之间通过RPC来相互交互,每个微服务都是由独立的团队开发、测试、部署、上线。
微服务可以布置在一台服务器上,也可以布置在多台服务器上,然后互相调用协作完成功能。
Zookeeper简介
Zookeeper用来注册服务和进行负载均衡,简单来说就是既然进行了分布式部署,那得有个东西来指路,调用者得请求得能找到对应处理的服务器,简单来说就是IP地址和服务名称的对应关系,也可以在如软件层面硬编码实现,但是如果服务器挂掉就没办法知道了,zookeeper就是提供这一套可以更加便捷的管理分布式系统,比如说服务器上下线的通知、负载均衡、统一配置管理等。
主要是包括文件系统和通知机制。
文件系统
存储节点 数据
类似于Unix文件系统的树形结构,每个节点称作一个ZNode,1MB默认大小
节点类型
临时有序(可以用来实现分布式锁)、临时无序
永久有序、永久无序
通知机制
Zookeeper特性
- 一个leader,多个follower的集群
- 集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器
- 全局数据一致,每个服务器都保存同样的数据,实时更新
- 更新的请求顺序保持顺序(来自同一个服务器)
- 数据更新的原子性,数据要么成功要么失败
-
应用场景
统一命名服务(域名服务)
- 统一配置管理(一个集群中的所有配置都一致,且也要实时更新同步)
- 将配置信息写入ZooKeeper上的一个Znode,各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器
- 统一集群管理(掌握实时状态)
- 将节点信息写入ZooKeeper上的一个ZNode。监听ZNode获取实时状态变化
- 服务器节点动态上下线
- 软负载均衡(根据每个节点的访问数,让访问数最少的服务器处理最新的数据需求)
安装步骤安装过程.txt
zookeeper的选举
zookeeper运行机制
集群数量
至少需要三台服务器,因为是半数运行机制,向上取整,即1.5——2,2.5—3,如果是挂掉1台服务器,相当于没挂掉,挂掉两台才挂掉,如果选择四台的话,挂掉两台服务器就崩掉了,相当于第四台没有作用。
配置语法
- server.<节点ID>=
:<数据同步端口>:<选举端口> - 节点ID:1-125之间的数字 写在对应服务器节点的myid文件里
- 数据同步端口:主从同步时数据传输的端口
- 选举端口:选举leader节点de 端口
- 集群配置

在zoo.cfg配置- 在dataDir对应的路径里有一个myid的文件<br />存储1、2、3 节点ID
角色
leader
主节点 领导者,拥有投票权follower
子节点 追随者
拥有投票权observer
次级子节点 观察者
配置的话要在最后的3888端口后面加:observer
没有投票权 一般情况下项目里是没有的 只有在集群压力过大时 才会上线查看角色
./bin/zkServer.sh status conf/zoo.cfg初始简单选举
第一轮投票:
每个节点都投票给自己,假设A B C三台服务器zookeeper先后启动,首先A启动,给自己投一票,B启动也给自己投一票,现在状态是C未启动,A B两台服务器各一票第二轮投票:
每个节点投票给大于自己myid的,然后直接选举,大于当前节点总数的半数就当选
A和B对比 发现同票 然后投给大于自己的 即A投给B,这时候B两票,已经等于半数了,所以B当选Leader,若C在再启动,那么C也是只能做Follower数据同步机制
**leader 接收到写请求,然后同步给各个子节点
**
follower接收到请求,返回给Leader
Leader广播proposal请求给follower
follower回复ACK信息给leader
leader接收到半数以上的ACK认为请求处理完成然后就提交
宕机后的数据同步
ZXID事务
ZXID是一个64位长度的数字,低32位每次数据变更,就简单加1,高32位是按leader周期来编号的
- 每次leader变更时,就从本地事务日志取出ZXID,然后将高32位解析出来加1,低32位全部置零,重新开始
- 保证ZXID的唯一性 和递增
宕机后的选举:
假设12345,此时3为leader且宕机,这个时候选出4为leader,然后执行了一些数据操作,这个时候3重启了,要进行重新选举,12345,那么还是4会当选为leader,3是不会参与选举的,因为它的ZXID小。数据同步:
假设12345,此时2为leader且宕机,这个时候选出3为leader,然后执行了一些数据操作,这个时候2重启了,会发现自身ZXID和leader的相比较旧,就会与其他四台进行数据的同步分布式锁的实现
原生JavaAPI实现
简述:主要是依靠 zookeeper的节点,当一个客户端创建一个节点时,另外一个客户端无法创建同名的节点(相当于互斥),且临时节点会在结束后自动删除,不影响使用。
A创建lock临时节点,然后看是否成功如果,当前只有这个lock节点那就成功获取了锁,要不然的话就继续监听自己的上一个节点,然后继续等待到能获取锁的时候。 ```java package case1;
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;
import java.io.FileInputStream; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch;
/**
- @author ljp
@date 12/2/2021 */ public class DistributedLock { // Properties zoo=new Properties(); // FileInputStream zoos=new FileInputStream(“zoo.properties”); private final String connectString = “192.168.188.130:2181,192.168.188.129:2181,192.168.188.128:2181”; private final int sessionTimeout = 2000; private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath; private String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果连接上zk 可以释放if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){connectLatch.countDown();}// waitLatch 需要释放if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}});// 等待zk正常连接后,往下走程序connectLatch.await();// 判断根节点/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建一下根节点zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
}
// 对zk加锁 public void zklock() {
// 创建对应的临时带序号节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// wait一小会, 让结果更清晰一些Thread.sleep(10);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点List<String> children = zk.getChildren("/locks", false);// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小if (children.size() == 1) {return;} else {Collections.sort(children);// 获取节点名称 seq-00000000String thisNode = currentMode.substring("/locks/".length());// 通过seq-00000000获取该节点在children集合的位置int index = children.indexOf(thisNode);// 判断if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// 就一个节点,可以获取锁了return;} else {// 需要监听 他前一个节点变化waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath,true,new Stat());// 等待监听waitLatch.await();return;}}
} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}// 解锁public void unZkLock() {// 删除节点try {zk.delete(this.currentMode,-1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}
}
```javapackage case1;import org.apache.zookeeper.KeeperException;import java.io.IOException;/*** @author ljp* @date 12/2/2021*/public class DistributedLockTest {public static void main(String[] args) throws InterruptedException, IOException, KeeperException {final DistributedLock lock1 = new DistributedLock();final DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock1.zklock();System.out.println("线程1 启动,获取到锁");Thread.sleep(5 * 1000);lock1.unZkLock();System.out.println("线程1 释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zklock();System.out.println("线程2 启动,获取到锁");Thread.sleep(5 * 1000);lock2.unZkLock();System.out.println("线程2 释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();System.out.println("main线程 ");}}
curator分布式锁
注意:安装的zookeeper版本需和pom.xml对应,然后execlusion 掉curator的zookeeper
原生的 Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
采用curator开源项目,即可以简单实现分布式锁,主要就是创建一个curator客户端,然后启动客户端,也就是封装好的zookeeper,然后acquire()获取锁和release()释放锁,方法都封装好了,使用非常简单。
package case2;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1 获取到锁");lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1 释放锁");lock1.release();System.out.println("线程1 再次释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2 获取到锁");lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2 释放锁");lock2.release();System.out.println("线程2 再次释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.188.128:2181,192.168.188.129:2181,192.168.188.130:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功");return client;}}
