编程题一:
在基于 Netty 的自定义RPC的案例基础上,进行改造。基于 Zookeeper 实现简易版服务的注册与发现机制
要求完成改造版本:
- 启动 2 个服务端,可以将IP及端口信息自动注册到 Zookeeper
- 客户端启动时,从Zookeeper中获取所有服务提供端节点信息,客户端与每一个服务端都建立连接
- 某个服务端下线后,Zookeeper注册列表会自动剔除下线的服务端节点,客户端与下线的服务端断开连接
- 服务端重新上线,客户端能感知到,并且与重新上线的服务端重新建立连接
编程题二:
基于作业一的基础上,实现基于 Zookeeper 的简易版负载均衡策略
要求完成改造版本:
- Zookeeper 记录每个服务端的最后一次响应时间,有效时间为 5秒,5s内如果该服务端没有新的请求,响应时间清零或失效
- 当客户端发起调用,每次都选择最后一次响应时间短的服务端进行服务调用,如果时间一致,随机选取一个服务端进行调用,从而实现负载均衡
编程题三:
基于Zookeeper实现简易版配置中心
要求实现以下功能:
- 创建一个 Web 项目,将数据库连接信息交给Zookeeper配置中心管理,即:当项目Web项目启动时,从 Zookeeper 进行 MySQL 配置参数的拉取
- 要求项目通过数据库连接池访问MySQL(连接池可以自由选择熟悉的)
- 当 Zookeeper 配置信息变化后Web项目自动感知,正确释放之前连接池,创建新的连接池
作业资料说明:
1、提供资料:3个代码工程、验证及讲解视频。(仓库中只有本次作业内容)
2、讲解内容包含:题目分析、实现思路、代码讲解。
3、效果视频验证:
3.1 作业1:服务端的上线与下线,客户端能动态感知,并能重新构成负载均衡。
3.2 作业2:作业完成情况下,选择性能好的服务器处理(响应时间短的服务器即为性能好)。Zookeeper记录客户端响应有效时间为5s,超时判定该客户端失效。
3.3 作业3:Zookeeper配置中心,web访问数据库需要从Zookeeper获取连接资源。当Zookeeper配置发生改变,web自动切换到新的连接资源,保持正常访问。
作业1
新增 NodeChangeListener 类
public interface NodeChangeListener {
/**
*
* @param serviceName 服务名称
* @param serviceList 服务名称对应节点下的所有子节点, 目前没有用到
* @param pathChildrenCacheEvent
*/
void notify(String serviceName, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent);
}
将 zk 的行为抽象成接口
public interface RpcRegistryHandler extends NodeChangeListener {
/**
* 服务端进行调用
*
* @param service
* @param ip
* @param port
* @return
*/
boolean registry(final String service, final String ip, final int port);
/**
* 客户端进行调用
*
* @param service
* @return
*/
List<String> discovery(final String service);
void addListener(NodeChangeListener service);
void destroy();
}
ConfigKeeper 配置类
public class ConfigKeeper {
/**
* netty 的端口号
*/
private int nettyPort;
/**
* zk 地址: ip + 端口
*/
private String zkAddr;
/**
* 主动上报时间,单位 秒
*/
private int interval;
/**
* 区分是客户端 还是 server 端, true 是服务端, false 是客户端
*/
private boolean providerSide;
// 单例类,setter 和 getter 方法
}
新增 RpcResponse 类
package com.lagou;
public class RpcResponse {
/**
* 响应ID
*/
private String requestId;
/**
* 错误信息
*/
private String error;
/**
* 返回的结果
*/
private Object result;
// setter 和 getter 方法, toString方法
}
RpcServerHandler 类,这次主要对 channelRead 的方法内容作了调整
/**
* 服务端将数据 写入 客户端, 继续传递下去
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 222222
RpcRequest request = (RpcRequest) msg;
final RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(request.getRequestId());
System.out.println("111 接收到" + request.getRequestId());
rpcResponse.setResult(handler(request));
// 3333333
ctx.writeAndFlush(rpcResponse);
}
服务端 rpc -server 用到的配置类 RpcServerConfig
public class RpcServerConfig {
private String nettyHost;
private int nettyPort;
private int delay;
/**
* 是否是服务端
*/
private boolean providerSide;
/**
* 应用的名称
*/
private String applicationName;
private Map<String, Class> services;
// setter 和 getter 方法
}
RpcServer 类
自身 implements InitializingBean, DisposableBean 接口
Autowired 了 RpcRegistryFactory 对象
主要关注 afterPropertiesSet 和 destroy 方法即可
@Override
public void afterPropertiesSet() throws Exception {
this.initRpcServerConfig();
this.startServer();
}
@Override
public void destroy() {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
这里牵涉到了RpcRegistryFactory
/**
* 注册中心工厂类
*/
@Component
public class RpcRegistryFactory implements FactoryBean<RpcRegistryHandler>, DisposableBean {
private RpcRegistryHandler rpcRegistryHandler;
@Override
public RpcRegistryHandler getObject() {
if (this.rpcRegistryHandler == null) {
rpcRegistryHandler = new ZkRegistryHandler(ConfigKeeper.getInstance().getZkAddr());
}
return rpcRegistryHandler;
}
@Override
public Class<?> getObjectType() {
System.out.println("RpcRegistryFactory ### getObjectType.....");
return RpcRegistryHandler.class;
}
@Override
public void destroy() {
System.out.println("RpcRegistryFactory ### destroy.....");
rpcRegistryHandler.destroy();
}
}
用于设计参数的 ProviderLoader
public class ProviderLoader {
private ProviderLoader() {
}
/**
* 返回类的全路径名 -> 该类的 class
* @return
*/
public static Map<String, Class> getInstanceCacheMap() {
Map<String, Class> services = new HashMap<>();
services.put(IUserService.class.getName(), IUserService.class);
return services;
}
}
ZkRegistryHandler 是对 RpcRegistryHandler 接口的 zk 实现。
public class ZkRegistryHandler implements RpcRegistryHandler {
private static final String ZK_PATH_SPLITER = "/";
private static final String LAGOU_EDU_RPC_ZK_ROOT = ZK_PATH_SPLITER + "lg-rpc-provider" + ZK_PATH_SPLITER;
private List<NodeChangeListener> listenerList = new ArrayList<>();
private final String url;
private final CuratorFramework client;
private volatile boolean closed;
/**
* 子节点列表
*/
private List<String> serviceList;
private static final ScheduledExecutorService REPORT_WORKER = Executors.newScheduledThreadPool(5);
public ZkRegistryHandler(final String zkPath) {
url = zkPath;
this.client = CuratorFrameworkFactory.builder()
.connectString(zkPath)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if (ConnectionState.CONNECTED.equals(connectionState)) {
System.out.println("注册中心连接成功");
}
}
});
client.start();
// 定时上报
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
final boolean providerSide = configKeeper.isProviderSide();
final int interval = configKeeper.getInterval();
if (!providerSide && interval > 0) {
REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("我是一个定时任务");
// RequestMetri
}
}, interval, interval, TimeUnit.SECONDS);
}
}
/**
* 服务端注册用到的方法
* @param serviceName
* @param nettyHost
* @param nettyPort
* @return
*/
@Override
public boolean registry(final String serviceName, final String nettyHost, final int nettyPort) {
String zkPath = providePath(serviceName);
if (!exists(zkPath)) {
create(zkPath, false);
}
// /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:8999
String instancePath = zkPath + ZK_PATH_SPLITER + nettyHost + ":" + nettyPort;
create(instancePath, true);
return true;
}
/**
* 客户端查找服务的方法
*
* @param serviceName
* @return
*/
@Override
public List<String> discovery(final String serviceName) {
final String path = providePath(serviceName);
if (serviceList == null || serviceList.isEmpty()) {
System.out.println("首次查找地址");
try {
serviceList = client.getChildren().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
this.registryWatch(serviceName, path);
return serviceList;
}
@Override
public void addListener(NodeChangeListener listener) {
listenerList.add(listener);
}
@Override
public void destroy() {
client.close();
}
@Override
public void notify(String children, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent) {
for (NodeChangeListener nodeChangeListener : listenerList) {
nodeChangeListener.notify(children, serviceList, pathChildrenCacheEvent);
}
}
private void create(final String path, final boolean ephemeral) {
CreateMode createMode;
if (ephemeral) {
createMode = CreateMode.EPHEMERAL;
} else {
createMode = CreateMode.PERSISTENT;
}
try {
client.create().creatingParentsIfNeeded().withMode(createMode).forPath(path);
} catch (KeeperException.NodeExistsException e) {
// do nothing
System.out.println("该路径已存在" + path);
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private boolean exists(final String path) {
try {
if (client.checkExists().forPath(path) != null) {
return true;
}
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}
/**
* 设置监听的方法
*
* @param serviceName
* @param path
*/
private void registryWatch(final String serviceName, final String path) {
PathChildrenCache nodeCache = new PathChildrenCache(client, path, true);
try {
nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
// 更新本地緩存
serviceList = client.getChildren().forPath(path);
listenerList.forEach(nodeChangeListener -> {
System.out.println("节点变化");
nodeChangeListener.notify(serviceName, serviceList, pathChildrenCacheEvent);
});
});
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
}
}
/**
* 返回 /lg-rpc-provider/com.lagou.server.IUserService/provider
* @param serviceName
* @return
*/
private String providePath(String serviceName) {
return LAGOU_EDU_RPC_ZK_ROOT + serviceName + ZK_PATH_SPLITER + "provider";
}
private String metricsPath() {
return LAGOU_EDU_RPC_ZK_ROOT + "metrics";
}
}
rpc-server 的启动类
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
// ["localhost:2181", "8999"]
// ["localhost:2181", "9000"]
final String zkPath = args[0];
final int nettyPort = Integer.parseInt(args[1]);
// 将IP及端口信息自动注册到 Zookeeper
ConfigKeeper configKeeper = ConfigKeeper.getInstance();
configKeeper.setProviderSide(true);
configKeeper.setInterval(5);
configKeeper.setNettyPort(nettyPort);
configKeeper.setZkAddr(zkPath);
System.out.println(configKeeper);
SpringApplication.run(MyApplication.class, args);
// 可以通过 ls /lg-rpc-provider/com.lagou.server.IUserService/provider 查看节点信息
}
}
分别为 netty 启动 8888 和 8900 端口
接下来讲解 rpc-client
UserClientHandler 类可以复用
新增 RpcClient
主要对外暴露了 initClient 和 send 方法
// 2. 初始化netty客户端(创建连接池 bootstrap, 设置 BootStrap 连接服务器)
public void initClient(String serviceClassName) throws InterruptedException {
// 创建连接池
this.group = new NioEventLoopGroup();
// 创建客户端启动类
Bootstrap bootstrap = new Bootstrap();
// 配置启动引导类
bootstrap.group(group)
// 通道类型为 NIO
.channel(NioSocketChannel.class)
// 设置请求协议为 tcp
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
// 监听 channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// 获取管道对象
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
// 自定义事件处理器
pipeline.addLast(new UserClientHandler());
}
});
this.channel = bootstrap.connect(this.nettyIp, this.nettyPort).sync().channel();
if (!isValidate()) {
close();
return;
}
System.out.println("启动客户端" + serviceClassName + ", ip = " + this.nettyIp + ", port = " + nettyPort);
}
public Object send(RpcRequest request) throws InterruptedException, ExecutionException {
// 统计请求时间
RequestMetrics.getInstance().put(nettyIp, this.nettyPort, request.getRequestId());
return this.channel.writeAndFlush(request).sync().get();
}
新增 RpcConsumer 类
主要有用的方法有构造方法,createProxy的方法,notify为类自身 实现 NodeChangeListener 接口的方法(因为构造时会this.rpcRegistryHandler.addListener(this);)。
public class RpcConsumer implements NodeChangeListener {
private final RpcRegistryHandler rpcRegistryHandler;
private final Map<String, Class> serviceMap;
/**
* 服务名 -> List<RpcClient>
*/
private final Map<String, List<RpcClient>> CLIENT_POOL = new HashMap<>();
private LoadBalanceStrategy balanceStrategy = new RandomLoadBalance();
/**
* 初始化
* @param rpcRegistryHandler
* @param instanceCacheMap
*/
public RpcConsumer(final RpcRegistryHandler rpcRegistryHandler, final Map<String, Class> instanceCacheMap) {
this.rpcRegistryHandler = rpcRegistryHandler;
this.serviceMap = instanceCacheMap;
// 开始自动注册消费者逻辑: accept 方法
serviceMap.forEach((className, clazz) -> {
List<RpcClient> rpcClients = CLIENT_POOL.get(className);
if (rpcClients == null) {
rpcClients = new ArrayList<>();
}
// 127.0.0.1:8999 127.0.0.1:9000
final List<String> discovery = this.rpcRegistryHandler.discovery(className);
for (String s : discovery) {
// s -> rpcClient
final String[] split = s.split(":");
String nettyIp = split[0];
int nettyPort = Integer.parseInt(split[1]);
final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
try {
rpcClient.initClient(className);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpcClients.add(rpcClient);
CLIENT_POOL.put(className, rpcClients);
}
});
this.rpcRegistryHandler.addListener(this);
}
// 4. 编写一个方法,使用 jdk 动态代理对象
@SuppressWarnings("unchecked")
public <T> T createProxyEnhance(final Class<T> serverClass) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serverClass}, (proxy, method, args) -> {
final String serverClassName = serverClass.getName();
// 封装
final RpcRequest request = new RpcRequest();
final String requestId = UUID.randomUUID().toString().substring(0, 7);
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
System.out.println("******************************\n请求id = " + requestId + ", 请求方法名 = " + methodName + ", 请求参数 = " + Arrays.toString(args));
RpcClient rpcClient = balanceStrategy.route(CLIENT_POOL, serverClassName);
if (rpcClient == null) {
System.out.println("没找到对应服务端,返 NULL");
return null;
}
System.out.println(request);
// request 最终会客户端发送给服务端进行消费
return rpcClient.send(request);
});
}
/**
* 监听临时节点的变化
*
* @param service 服务名称
* @param serviceList 服务名称对应节点下的所有子节点
* @param pathChildrenCacheEvent
*/
@Override
public void notify(final String service, final List<String> serviceList,
final PathChildrenCacheEvent pathChildrenCacheEvent) {
// 取出变化的节点名称, 例如为 /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:9000
final String path = pathChildrenCacheEvent.getData().getPath();
System.out.println("变化节点的路径: " + path + ", 变化的类型: " + pathChildrenCacheEvent.getType());
// 分离出 ip:port 的组合。
final String instanceConfig = path.substring(path.lastIndexOf("/") + 1);
System.out.println("instanceConfig: " + instanceConfig);
final String[] address = instanceConfig.split(":");
System.out.println("address: " + address);
final String nettyIp = address[0];
final int nettyPort = Integer.parseInt(address[1]);
List<RpcClient> rpcClients = CLIENT_POOL.get(service);
switch (pathChildrenCacheEvent.getType()) {
// 增加节点
case CHILD_ADDED:
case CONNECTION_RECONNECTED:
if (rpcClients == null) {
rpcClients = new ArrayList<>();
}
final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
try {
rpcClient.initClient(service);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpcClients.add(rpcClient);
// 节点耗时统计
RequestMetrics.getInstance().addNode(nettyIp, nettyPort);
System.out.println("新增节点" + instanceConfig);
break;
// 增加节点
case CHILD_REMOVED:
case CONNECTION_SUSPENDED:
case CONNECTION_LOST:
if (rpcClients != null) {
for (RpcClient client : rpcClients) {
if (client.getNettyIp().equals(nettyIp) && client.getNettyPort() == nettyPort) {
rpcClients.remove(client);
// 节点耗时统计
RequestMetrics.getInstance().remoteNode(nettyIp, nettyPort);
System.out.println("移除节点" + instanceConfig);
break;
}
}
}
break;
}
}
}
最后再讲讲 ConsumerBootStrap, 该类基本没啥改动
public class ConsumerBootStrap {
public static void main(final String[] args) throws Exception {
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
configKeeper.setZkAddr(args[0]);
// 之后会启动一个定时的线程池,每 5s 上传到注册中心
configKeeper.setInterval(5);
configKeeper.setProviderSide(false);
final RpcRegistryHandler rpcRegistryHandler = new ZkRegistryHandler(configKeeper.getZkAddr());
System.out.println("客户端 Zookeeper session established.");
// 最后一步
final RpcConsumer consumer = new RpcConsumer(rpcRegistryHandler, ProviderLoader.getInstanceCacheMap());
final IUserService userService = consumer.createProxyEnhance(IUserService.class);
while (true) {
Thread.sleep(4900);
final String result = userService.sayHello("are you ok?");
// 恒为 null
System.out.println("返回 = " + result);
}
}
}
作业2
1.消费者每次请求完成时更新最后一次请求耗时和系统时间
这部分工作主要在客户端做。
首先介绍一下这次用到的两个类
package com.lagou.boot;
public class Metrics {
private String nettyIp;
private int nettyPort;
private long start;
private Long cost;
public Metrics(String nettyIp, int nettyPort, long start, Long cost) {
this.nettyIp = nettyIp;
this.nettyPort = nettyPort;
this.start = start;
this.cost = cost;
}
public Metrics(String nettyIp, int nettyPort, long start) {
this(nettyIp, nettyPort, start, null);
}
public String getNettyIp() {
return nettyIp;
}
public void setNettyIp(String nettyIp) {
this.nettyIp = nettyIp;
}
public int getNettyPort() {
return nettyPort;
}
public void setNettyPort(int nettyPort) {
this.nettyPort = nettyPort;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public Long getCost() {
return cost;
}
public void setCost(Long cost) {
this.cost = cost;
}
}
RequestMetrics 类
COST_TIME_MAP变量 ip:端口 -》 耗时
REQUEST_ID_MAP变量 requestId -> ip + 端口 + 起始时间戳 + 耗时
calculate 方法用于 根据requestId 进行耗时统计
统计请求时间 在RpcClient的 send 方法中进行
public class RequestMetrics {
/**
* ip:端口 -》 耗时
*/
private static final ConcurrentHashMap<String, Long> COST_TIME_MAP = new ConcurrentHashMap<>();
/**
* requestId -> ip + 端口 + 起始时间戳 + 耗时
* 每个 requestId 用完一次后就会被销毁
*/
private static final ConcurrentHashMap<String, Metrics> REQUEST_ID_MAP = new ConcurrentHashMap<>();
private static final RequestMetrics requestMetrics = new RequestMetrics();
public ConcurrentHashMap<String, Long> getMetricMap() {
return COST_TIME_MAP;
}
private RequestMetrics() {
}
public static RequestMetrics getInstance() {
return requestMetrics;
}
public void addNode(String nettyIp, int nettyPort) {
COST_TIME_MAP.put(nettyIp + ":" + nettyPort, 0L);
}
public void remoteNode(String nettyIp, int nettyPort) {
COST_TIME_MAP.remove(nettyIp + ":" + nettyPort);
}
/**
* 响应时放入, 根据requestId 进行耗时统计
* @param requestId
*/
public void calculate(String requestId) {
final Metrics metrics = REQUEST_ID_MAP.get(requestId);
Long cost = System.currentTimeMillis() - metrics.getStart();
COST_TIME_MAP.put(metrics.getNettyIp() + ":" + metrics.getNettyPort(), cost);
REQUEST_ID_MAP.remove(requestId);
}
/**
* 获取所有节点耗时统计
*/
public List<Metrics> getAllInstances() {
List<Metrics> result = new ArrayList<>();
COST_TIME_MAP.forEach((url, aLong) -> {
String[] split = url.split(":");
result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
});
return result;
}
/**
* 请求时放入
* @param nettyIp
* @param nettyPort
* @param requestId
*/
public void put(String nettyIp, int nettyPort, String requestId) {
REQUEST_ID_MAP.put(requestId, new Metrics(nettyIp, nettyPort, System.currentTimeMillis(), null));
}
}
2.消费者定时在启动时创建定时线程池,每隔5s自动上报,更新Zookeeper临时节点的值
ConsumerBootStrap 入口有一个参数配置
// 之后会启动一个定时的线程池,每 5s 上传到注册中心
configKeeper.setInterval(5);
ZkRegistryHandler 会开启一个 ScheduledExecutorService 线程池服务
RequestMetrics 的
// 定时上报
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
final boolean providerSide = configKeeper.isProviderSide();
final int interval = configKeeper.getInterval();
if (!providerSide && interval > 0) {
REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("我是一个定时任务");
// ...
}
}, interval, interval, TimeUnit.SECONDS);
}
- 每次上报时判断当前时间距离最后一次请求是否超过5s,超过5s则需要删除Zookeeper上面的内容
这里介绍下 RequestMetrics 的 getAllInstances() 方法, 如果 5 秒内没有响应清空请求时间。
/**
* 获取所有节点耗时统计
*/
public List<Metrics> getAllInstances() {
List<Metrics> result = new ArrayList<>();
COST_TIME_MAP.forEach((url, aLong) -> {
String[] split = url.split(":");
result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
});
return result;
}
接下来简单说一下负载均衡策略,这里主要涉及到了使用那个客户端进行服务的请求。
public abstract class AbstractLoadBalanceStrategy implements LoadBalanceStrategy{
@Override
public RpcClient route(Map<String, List<RpcClient>> clientPool, String serverClassName) {
List<RpcClient> rpcClients = clientPool.get(serverClassName);
if (null == rpcClients) return null;
return doSelect(rpcClients);
}
protected abstract RpcClient doSelect(List<RpcClient> rpcClients);
}
MinCostLoadBalance (该类未经验证)
public class MinCostLoadBalance extends AbstractLoadBalanceStrategy {
@Override
protected RpcClient doSelect(final List<RpcClient> rpcClients) {
ConcurrentHashMap<String, Long> metricMap = RequestMetrics.getInstance().getMetricMap();
RpcClient minCostRpcClient = rpcClients.get(0);
final Long minLong = metricMap.get(minCostRpcClient.getNettyIp() + minCostRpcClient.getNettyPort());
for (int i = 1; i < rpcClients.size(); i++) {
RpcClient rpcClient = rpcClients.get(i);
String nettyIp = rpcClient.getNettyIp();
int nettyPort = rpcClient.getNettyPort();
// 取出最小响应时间的客户端,并进行调用
Long aLong = metricMap.get(nettyIp + nettyPort);
if (aLong != null && aLong < minLong) {
minCostRpcClient = rpcClient;
}
}
return minCostRpcClient;
}
}
RandomLoadBalance (该类未经验证)
public class RandomLoadBalance extends AbstractLoadBalanceStrategy {
private final Random random = new Random();
@Override
protected RpcClient doSelect(List<RpcClient> rpcClients) {
int size = rpcClients.size();
int index = random.nextInt(size);
return rpcClients.get(index);
}
}
作业3
以下项目主要使用了 commons-dbcp + fastjson + apache.curator 技术进行实现。
这里会通过create [-s][-e] path data acl
命令创建节点:
建立所需节点
我会将数据库配置的用户名和密码等信息写入/dbConfig/lagou.config.DbConfig
节点中。
先建立父节点
create /dbConfig ""
然后若不存在临时节点则重新创建
# 向 /dbConfig/lagou.config.DbConfig 中写入配置信息
create -e /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 查看是否能正常获取节点信息
get /dbConfig/lagou.config.DbConfig
更改数据
# 更改数据库为 aaaa
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 更改数据库为 bbbb
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/bbbb?serverTimezone=UTC"}
创建Java类
- 创建工具类 RuntimeContext
@Component
public class RuntimeContext implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (RuntimeContext.applicationContext == null) {
RuntimeContext.applicationContext = applicationContext;
}
}
//获取applicationContext
private static ApplicationContext getApplicationContext() {
return applicationContext;
}
//通过name获取Bean
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
// ...
}
- 实体类
package lagou.config;
public class DbConfig {
private String url;
private String username;
private String password;
// setter getter 方法
}
- 创建 MyDataSource 自定义数据源,我们重写的dbcp中的类BasicDataSource,我们将其全部拷贝了出来,然后重命名为MyDataSource类,然后在其中修改了以下内容
3.1将 UNKNOWN_TRANSACTIONISOLATION 的值改为 -1. 否则这个内部变量会找不到
/**
* The default TransactionIsolation state of connections created by this pool.
*/
protected volatile int defaultTransactionIsolation = PoolableConnectionFactory.UNKNOWN_TRANSACTIONISOLATION;
3.2jdk 1.7 之后需要实现该方法 getParentLogger()
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
3.3 修改已有的 createDataSource() 方法,删除这几行代码
3.4 新增 changeDataSource 方法
public static void changeDataSource() {
MyDataSource dataSource = (MyDataSource) RuntimeContext.getBean("dataSource");
try {
dataSource.close();
dataSource.createDataSource();
} catch (SQLException e) {
e.printStackTrace();
}
}
- 新建 InitListener 类,该类实现了ServletContextListener 来对Zookeeper节点 /db/url 的监听
public class InitListener implements ServletContextListener {
private static final String CONNENT_ADDR = "localhost:2181";
private static final String PATH = "/dbConfig";
private static final String SUB_PATH = PATH + "/" + DbConfig.class.getName();
@Override
public void contextInitialized(ServletContextEvent sce) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNENT_ADDR)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
PathChildrenCache nodeCache = new PathChildrenCache(curatorFramework, "/dbConfig", true);
try {
nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
System.out.println(pathChildrenCacheEvent.getType());
if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())) {
final ChildData data = pathChildrenCacheEvent.getData();
if (data != null) {
final String path = data.getPath();
System.out.println(path);
System.out.println(SUB_PATH);
if (path.equals(SUB_PATH)) {
MyDataSource datasource = (MyDataSource) RuntimeContext.getBean("dataSource");
final DbConfig dbConfig = JSON.parseObject(new String(data.getData()), DbConfig.class);
System.out.println(dbConfig.toString());
datasource.setUrl(dbConfig.getUrl());
datasource.setUsername(dbConfig.getUsername());
datasource.setPassword(dbConfig.getPassword());
MyDataSource.changeDataSource();
}
}
}
});
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
- 修改spring boot 启动类,注册 InitListener ,配置 我们自定义的 DataSource,建立一个query方法(可通过/query 进行访问)暴露出去。
@SpringBootApplication
@RestController
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@RequestMapping("/query")
public String query() {
String sql = "select id from `info` limit 1";
return jdbcTemplate.queryForObject(sql, String.class);
}
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean() {
ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
@Bean
public DataSource dataSource(@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
MyDataSource dataSource = new MyDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
@Autowired
private JdbcTemplate jdbcTemplate;
}
6.application.properties 配置
server.port=80
spring.datasource.url=jdbc:mysql://localhost:3306/aaaa?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
参考
基于Zookeeper动态切换数据源_BXS_0107的博客-CSDN博客
https://blog.csdn.net/newbie0107/article/details/105500579
作者:ac86
链接:https://www.jianshu.com/p/eafe3134bdc8
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。