编程题一:
在基于 Netty 的自定义RPC的案例基础上,进行改造。基于 Zookeeper 实现简易版服务的注册与发现机制
要求完成改造版本:

  1. 启动 2 个服务端,可以将IP及端口信息自动注册到 Zookeeper
  2. 客户端启动时,从Zookeeper中获取所有服务提供端节点信息,客户端与每一个服务端都建立连接
  3. 某个服务端下线后,Zookeeper注册列表会自动剔除下线的服务端节点,客户端与下线的服务端断开连接
  4. 服务端重新上线,客户端能感知到,并且与重新上线的服务端重新建立连接

编程题二:
基于作业一的基础上,实现基于 Zookeeper 的简易版负载均衡策略
要求完成改造版本:

  1. Zookeeper 记录每个服务端的最后一次响应时间,有效时间为 5秒,5s内如果该服务端没有新的请求,响应时间清零或失效
  2. 当客户端发起调用,每次都选择最后一次响应时间短的服务端进行服务调用,如果时间一致,随机选取一个服务端进行调用,从而实现负载均衡

编程题三:
基于Zookeeper实现简易版配置中心
要求实现以下功能:

  1. 创建一个 Web 项目,将数据库连接信息交给Zookeeper配置中心管理,即:当项目Web项目启动时,从 Zookeeper 进行 MySQL 配置参数的拉取
  2. 要求项目通过数据库连接池访问MySQL(连接池可以自由选择熟悉的)
  3. 当 Zookeeper 配置信息变化后Web项目自动感知,正确释放之前连接池,创建新的连接池

作业资料说明:
1、提供资料:3个代码工程、验证及讲解视频。(仓库中只有本次作业内容)
2、讲解内容包含:题目分析、实现思路、代码讲解。
3、效果视频验证:
3.1 作业1:服务端的上线与下线,客户端能动态感知,并能重新构成负载均衡。
3.2 作业2:作业完成情况下,选择性能好的服务器处理(响应时间短的服务器即为性能好)。Zookeeper记录客户端响应有效时间为5s,超时判定该客户端失效。
3.3 作业3:Zookeeper配置中心,web访问数据库需要从Zookeeper获取连接资源。当Zookeeper配置发生改变,web自动切换到新的连接资源,保持正常访问。
作业1
新增 NodeChangeListener 类

  1. public interface NodeChangeListener {
  2. /**
  3. *
  4. * @param serviceName 服务名称
  5. * @param serviceList 服务名称对应节点下的所有子节点, 目前没有用到
  6. * @param pathChildrenCacheEvent
  7. */
  8. void notify(String serviceName, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent);
  9. }

将 zk 的行为抽象成接口

  1. public interface RpcRegistryHandler extends NodeChangeListener {
  2. /**
  3. * 服务端进行调用
  4. *
  5. * @param service
  6. * @param ip
  7. * @param port
  8. * @return
  9. */
  10. boolean registry(final String service, final String ip, final int port);
  11. /**
  12. * 客户端进行调用
  13. *
  14. * @param service
  15. * @return
  16. */
  17. List<String> discovery(final String service);
  18. void addListener(NodeChangeListener service);
  19. void destroy();
  20. }

ConfigKeeper 配置类

public class ConfigKeeper {
    /**
     * netty 的端口号
     */
    private int nettyPort;
    /**
     * zk 地址: ip + 端口
     */
    private String zkAddr;
    /**
     * 主动上报时间,单位 秒
     */
    private int interval;
    /**
     * 区分是客户端 还是 server 端, true 是服务端, false 是客户端
     */
    private boolean providerSide;
    // 单例类,setter 和 getter 方法
}

新增 RpcResponse 类

package com.lagou;
public class RpcResponse  {
    /**
     * 响应ID
     */
    private String requestId;
    /**
     * 错误信息
     */
    private String error;
    /**
     * 返回的结果
     */
    private Object result;
    // setter 和 getter 方法, toString方法 
}

RpcServerHandler 类,这次主要对 channelRead 的方法内容作了调整

/**
     * 服务端将数据 写入 客户端, 继续传递下去
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 222222
        RpcRequest request = (RpcRequest) msg;
        final RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(request.getRequestId());
        System.out.println("111 接收到" + request.getRequestId());
        rpcResponse.setResult(handler(request));
        // 3333333
        ctx.writeAndFlush(rpcResponse);
    }

服务端 rpc -server 用到的配置类 RpcServerConfig

public class RpcServerConfig {
    private String nettyHost;
    private int nettyPort;
    private int delay;
    /**
     * 是否是服务端
     */
    private boolean providerSide;
    /**
     * 应用的名称
     */
    private String applicationName;
    private Map<String, Class> services;

    // setter 和 getter 方法
}

RpcServer 类
自身 implements InitializingBean, DisposableBean 接口
Autowired 了 RpcRegistryFactory 对象
主要关注 afterPropertiesSet 和 destroy 方法即可

@Override
    public void afterPropertiesSet() throws Exception {
        this.initRpcServerConfig();
        this.startServer();
    }
    @Override
    public void destroy() {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }

这里牵涉到了RpcRegistryFactory

/**
 * 注册中心工厂类
 */
@Component
public class RpcRegistryFactory implements FactoryBean<RpcRegistryHandler>, DisposableBean {
    private RpcRegistryHandler rpcRegistryHandler;
    @Override
    public RpcRegistryHandler getObject()  {
        if (this.rpcRegistryHandler == null) {
            rpcRegistryHandler = new ZkRegistryHandler(ConfigKeeper.getInstance().getZkAddr());
        }
        return rpcRegistryHandler;
    }
    @Override
    public Class<?> getObjectType() {
        System.out.println("RpcRegistryFactory ### getObjectType.....");
        return RpcRegistryHandler.class;
    }
    @Override
    public void destroy() {
        System.out.println("RpcRegistryFactory ### destroy.....");
        rpcRegistryHandler.destroy();
    }
}

用于设计参数的 ProviderLoader

public class ProviderLoader {
    private ProviderLoader() {
    }
    /**
     * 返回类的全路径名 -> 该类的 class
     * @return
     */
    public static Map<String, Class> getInstanceCacheMap() {
        Map<String, Class> services = new HashMap<>();
        services.put(IUserService.class.getName(), IUserService.class);
        return services;
    }
}

ZkRegistryHandler 是对 RpcRegistryHandler 接口的 zk 实现。

public class ZkRegistryHandler implements RpcRegistryHandler {
    private static final String ZK_PATH_SPLITER = "/";
    private static final String LAGOU_EDU_RPC_ZK_ROOT = ZK_PATH_SPLITER + "lg-rpc-provider" + ZK_PATH_SPLITER;
    private List<NodeChangeListener> listenerList = new ArrayList<>();
    private final String url;
    private final CuratorFramework client;
    private volatile boolean closed;
    /**
     * 子节点列表
     */
    private List<String> serviceList;
    private static final ScheduledExecutorService REPORT_WORKER = Executors.newScheduledThreadPool(5);
    public ZkRegistryHandler(final String zkPath) {
        url = zkPath;
        this.client = CuratorFrameworkFactory.builder()
                .connectString(zkPath)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                if (ConnectionState.CONNECTED.equals(connectionState)) {
                    System.out.println("注册中心连接成功");
                }
            }
        });
        client.start();
        // 定时上报
        final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        final boolean providerSide = configKeeper.isProviderSide();
        final int interval = configKeeper.getInterval();
        if (!providerSide && interval > 0) {
            REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    System.out.println("我是一个定时任务");
                    // RequestMetri
                }
            }, interval, interval, TimeUnit.SECONDS);
        }
    }
    /**
     * 服务端注册用到的方法
     * @param serviceName
     * @param nettyHost
     * @param nettyPort
     * @return
     */
    @Override
    public boolean registry(final String serviceName, final String nettyHost, final int nettyPort) {
        String zkPath = providePath(serviceName);
        if (!exists(zkPath)) {
            create(zkPath, false);
        }
        // /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:8999
        String instancePath = zkPath + ZK_PATH_SPLITER + nettyHost + ":" + nettyPort;
        create(instancePath, true);
        return true;
    }
    /**
     * 客户端查找服务的方法
     *
     * @param serviceName
     * @return
     */
    @Override
    public List<String> discovery(final String serviceName) {
        final String path = providePath(serviceName);
        if (serviceList == null || serviceList.isEmpty()) {
            System.out.println("首次查找地址");
            try {
                serviceList = client.getChildren().forPath(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.registryWatch(serviceName, path);
        return serviceList;
    }
    @Override
    public void addListener(NodeChangeListener listener) {
        listenerList.add(listener);
    }
    @Override
    public void destroy() {
        client.close();
    }
    @Override
    public void notify(String children, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent) {
        for (NodeChangeListener nodeChangeListener : listenerList) {
            nodeChangeListener.notify(children, serviceList, pathChildrenCacheEvent);
        }
    }
    private void create(final String path, final boolean ephemeral) {
        CreateMode createMode;
        if (ephemeral) {
            createMode = CreateMode.EPHEMERAL;
        } else {
            createMode = CreateMode.PERSISTENT;
        }
        try {
            client.create().creatingParentsIfNeeded().withMode(createMode).forPath(path);
        } catch (KeeperException.NodeExistsException e) {
            // do nothing
            System.out.println("该路径已存在" + path);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
    private boolean exists(final String path) {
        try {
            if (client.checkExists().forPath(path) != null) {
                return true;
            }
        } catch (KeeperException.NoNodeException e) {
            // do nothing
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return false;
    }
    /**
     * 设置监听的方法
     *
     * @param serviceName
     * @param path
     */
    private void registryWatch(final String serviceName, final String path) {
        PathChildrenCache nodeCache = new PathChildrenCache(client, path, true);
        try {
            nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
                // 更新本地緩存
                serviceList = client.getChildren().forPath(path);
                listenerList.forEach(nodeChangeListener -> {
                    System.out.println("节点变化");
                    nodeChangeListener.notify(serviceName, serviceList, pathChildrenCacheEvent);
                });
            });
            nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
        }
    }
    /**
     * 返回 /lg-rpc-provider/com.lagou.server.IUserService/provider
     * @param serviceName
     * @return
     */
    private String providePath(String serviceName) {
        return LAGOU_EDU_RPC_ZK_ROOT + serviceName + ZK_PATH_SPLITER + "provider";
    }
    private String metricsPath() {
        return LAGOU_EDU_RPC_ZK_ROOT + "metrics";
    }
}

rpc-server 的启动类

@SpringBootApplication
public class MyApplication {
    public static void main(String[] args) {
        // ["localhost:2181", "8999"]
        // ["localhost:2181", "9000"]
        final String zkPath = args[0];
        final int nettyPort = Integer.parseInt(args[1]);
        // 将IP及端口信息自动注册到 Zookeeper
        ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        configKeeper.setProviderSide(true);
        configKeeper.setInterval(5);
        configKeeper.setNettyPort(nettyPort);
        configKeeper.setZkAddr(zkPath);
        System.out.println(configKeeper);
        SpringApplication.run(MyApplication.class, args);
        // 可以通过 ls /lg-rpc-provider/com.lagou.server.IUserService/provider 查看节点信息
    }
}

分别为 netty 启动 8888 和 8900 端口

作业 - 图1
接下来讲解 rpc-client
UserClientHandler 类可以复用
新增 RpcClient
主要对外暴露了 initClient 和 send 方法

// 2. 初始化netty客户端(创建连接池 bootstrap, 设置 BootStrap 连接服务器)
    public void initClient(String serviceClassName) throws InterruptedException {
        // 创建连接池
        this.group = new NioEventLoopGroup();
        // 创建客户端启动类
        Bootstrap bootstrap = new Bootstrap();
        // 配置启动引导类
        bootstrap.group(group)
                // 通道类型为 NIO
                .channel(NioSocketChannel.class)
                // 设置请求协议为 tcp
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                // 监听 channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 获取管道对象
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
                        // 自定义事件处理器
                        pipeline.addLast(new UserClientHandler());
                    }
                });
        this.channel = bootstrap.connect(this.nettyIp, this.nettyPort).sync().channel();
        if (!isValidate()) {
            close();
            return;
        }
        System.out.println("启动客户端" + serviceClassName + ", ip = " + this.nettyIp + ", port = " + nettyPort);
    }
    public Object send(RpcRequest request) throws InterruptedException, ExecutionException {
        // 统计请求时间
        RequestMetrics.getInstance().put(nettyIp, this.nettyPort, request.getRequestId());
        return this.channel.writeAndFlush(request).sync().get();
    }

新增 RpcConsumer 类
主要有用的方法有构造方法,createProxy的方法,notify为类自身 实现 NodeChangeListener 接口的方法(因为构造时会this.rpcRegistryHandler.addListener(this);)。

public class RpcConsumer implements NodeChangeListener {
    private final RpcRegistryHandler rpcRegistryHandler;
    private final Map<String, Class> serviceMap;
    /**
     * 服务名 -> List<RpcClient>
     */
    private final Map<String, List<RpcClient>> CLIENT_POOL = new HashMap<>();
    private LoadBalanceStrategy balanceStrategy = new RandomLoadBalance();
    /**
     * 初始化
     * @param rpcRegistryHandler
     * @param instanceCacheMap
     */
    public RpcConsumer(final RpcRegistryHandler rpcRegistryHandler, final Map<String, Class> instanceCacheMap) {
        this.rpcRegistryHandler = rpcRegistryHandler;
        this.serviceMap = instanceCacheMap;
        // 开始自动注册消费者逻辑: accept 方法
        serviceMap.forEach((className, clazz) -> {
            List<RpcClient> rpcClients = CLIENT_POOL.get(className);
            if (rpcClients == null) {
                rpcClients = new ArrayList<>();
            }
            // 127.0.0.1:8999 127.0.0.1:9000
            final List<String> discovery = this.rpcRegistryHandler.discovery(className);
            for (String s : discovery) {
                // s -> rpcClient
                final String[] split = s.split(":");
                String nettyIp = split[0];
                int nettyPort = Integer.parseInt(split[1]);
                final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
                try {
                    rpcClient.initClient(className);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                rpcClients.add(rpcClient);
                CLIENT_POOL.put(className, rpcClients);
            }
        });
        this.rpcRegistryHandler.addListener(this);
    }
    // 4. 编写一个方法,使用 jdk 动态代理对象
    @SuppressWarnings("unchecked")
    public <T> T createProxyEnhance(final Class<T> serverClass) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serverClass}, (proxy, method, args) -> {
                    final String serverClassName = serverClass.getName();
                    // 封装
                    final RpcRequest request = new RpcRequest();
                    final String requestId = UUID.randomUUID().toString().substring(0, 7);
                    String className = method.getDeclaringClass().getName();
                    String methodName = method.getName();
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    request.setRequestId(requestId);
                    request.setClassName(className);
                    request.setMethodName(methodName);
                    request.setParameterTypes(parameterTypes);
                    request.setParameters(args);
                    System.out.println("******************************\n请求id = " + requestId + ", 请求方法名 = " + methodName + ", 请求参数 = " + Arrays.toString(args));
                    RpcClient rpcClient = balanceStrategy.route(CLIENT_POOL, serverClassName);
                    if (rpcClient == null) {
                        System.out.println("没找到对应服务端,返 NULL");
                        return null;
                    }
                    System.out.println(request);
                    // request 最终会客户端发送给服务端进行消费
                    return rpcClient.send(request);
                });
    }
    /**
     * 监听临时节点的变化
     *
     * @param service 服务名称
     * @param serviceList  服务名称对应节点下的所有子节点
     * @param pathChildrenCacheEvent
     */
    @Override
    public void notify(final String service, final List<String> serviceList,
                       final PathChildrenCacheEvent pathChildrenCacheEvent) {
        // 取出变化的节点名称, 例如为 /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:9000
        final String path = pathChildrenCacheEvent.getData().getPath();
        System.out.println("变化节点的路径: " + path + ", 变化的类型: " + pathChildrenCacheEvent.getType());
        // 分离出 ip:port 的组合。
        final String instanceConfig = path.substring(path.lastIndexOf("/") + 1);
        System.out.println("instanceConfig: " + instanceConfig);
        final String[] address = instanceConfig.split(":");
        System.out.println("address: " + address);
        final String nettyIp = address[0];
        final int nettyPort = Integer.parseInt(address[1]);
        List<RpcClient> rpcClients = CLIENT_POOL.get(service);
        switch (pathChildrenCacheEvent.getType()) {
            // 增加节点
            case CHILD_ADDED:
            case CONNECTION_RECONNECTED:
                if (rpcClients == null) {
                    rpcClients = new ArrayList<>();
                }
                final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
                try {
                    rpcClient.initClient(service);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                rpcClients.add(rpcClient);
                // 节点耗时统计
                RequestMetrics.getInstance().addNode(nettyIp, nettyPort);
                System.out.println("新增节点" + instanceConfig);
                break;
            // 增加节点
            case CHILD_REMOVED:
            case CONNECTION_SUSPENDED:
            case CONNECTION_LOST:
                if (rpcClients != null) {
                    for (RpcClient client : rpcClients) {
                        if (client.getNettyIp().equals(nettyIp) && client.getNettyPort() == nettyPort) {
                            rpcClients.remove(client);
                            // 节点耗时统计
                            RequestMetrics.getInstance().remoteNode(nettyIp, nettyPort);
                            System.out.println("移除节点" + instanceConfig);
                            break;
                        }
                    }
                }
                break;
        }
    }
}

最后再讲讲 ConsumerBootStrap, 该类基本没啥改动

public class ConsumerBootStrap {
    public static void main(final String[] args) throws Exception {
        final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        configKeeper.setZkAddr(args[0]);
        // 之后会启动一个定时的线程池,每 5s 上传到注册中心
        configKeeper.setInterval(5);
        configKeeper.setProviderSide(false);
        final RpcRegistryHandler rpcRegistryHandler = new ZkRegistryHandler(configKeeper.getZkAddr());
        System.out.println("客户端 Zookeeper session established.");
        // 最后一步
        final RpcConsumer consumer = new RpcConsumer(rpcRegistryHandler, ProviderLoader.getInstanceCacheMap());
        final IUserService userService = consumer.createProxyEnhance(IUserService.class);
        while (true) {
            Thread.sleep(4900);
            final String result = userService.sayHello("are you ok?");
            // 恒为 null
            System.out.println("返回 = " + result);
        }
    }
}

作业2
作业 - 图2
1.消费者每次请求完成时更新最后一次请求耗时和系统时间
这部分工作主要在客户端做。
首先介绍一下这次用到的两个类

package com.lagou.boot;
public class Metrics {
    private String nettyIp;
    private int nettyPort;
    private long start;
    private Long cost;
    public Metrics(String nettyIp, int nettyPort, long start, Long cost) {
        this.nettyIp = nettyIp;
        this.nettyPort = nettyPort;
        this.start = start;
        this.cost = cost;
    }
    public Metrics(String nettyIp, int nettyPort, long start) {
        this(nettyIp, nettyPort, start, null);
    }
    public String getNettyIp() {
        return nettyIp;
    }
    public void setNettyIp(String nettyIp) {
        this.nettyIp = nettyIp;
    }
    public int getNettyPort() {
        return nettyPort;
    }
    public void setNettyPort(int nettyPort) {
        this.nettyPort = nettyPort;
    }
    public long getStart() {
        return start;
    }
    public void setStart(long start) {
        this.start = start;
    }
    public Long getCost() {
        return cost;
    }
    public void setCost(Long cost) {
        this.cost = cost;
    }
}

RequestMetrics 类
COST_TIME_MAP变量 ip:端口 -》 耗时
REQUEST_ID_MAP变量 requestId -> ip + 端口 + 起始时间戳 + 耗时
calculate 方法用于 根据requestId 进行耗时统计
统计请求时间 在RpcClient的 send 方法中进行

public class RequestMetrics {
    /**
     * ip:端口 -》 耗时
     */
    private static final ConcurrentHashMap<String, Long> COST_TIME_MAP = new ConcurrentHashMap<>();
    /**
     * requestId -> ip + 端口 + 起始时间戳 + 耗时
     * 每个 requestId 用完一次后就会被销毁
     */
    private static final ConcurrentHashMap<String, Metrics> REQUEST_ID_MAP = new ConcurrentHashMap<>();
    private static final RequestMetrics requestMetrics = new RequestMetrics();
    public ConcurrentHashMap<String, Long> getMetricMap() {
        return COST_TIME_MAP;
    }
    private RequestMetrics() {
    }
    public static RequestMetrics getInstance() {
        return requestMetrics;
    }
    public void addNode(String nettyIp, int nettyPort) {
        COST_TIME_MAP.put(nettyIp + ":" + nettyPort, 0L);
    }
    public void remoteNode(String nettyIp, int nettyPort) {
        COST_TIME_MAP.remove(nettyIp + ":" + nettyPort);
    }
    /**
     * 响应时放入, 根据requestId 进行耗时统计
     * @param requestId
     */
    public void calculate(String requestId) {
        final Metrics metrics = REQUEST_ID_MAP.get(requestId);
        Long cost = System.currentTimeMillis() - metrics.getStart();
        COST_TIME_MAP.put(metrics.getNettyIp() + ":" + metrics.getNettyPort(), cost);
        REQUEST_ID_MAP.remove(requestId);
    }
    /**
     * 获取所有节点耗时统计
     */
    public List<Metrics> getAllInstances() {
        List<Metrics> result = new ArrayList<>();
        COST_TIME_MAP.forEach((url, aLong) -> {
            String[] split = url.split(":");
            result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
        });
        return result;
    }
    /**
     * 请求时放入
     * @param nettyIp
     * @param nettyPort
     * @param requestId
     */
    public void put(String nettyIp, int nettyPort, String requestId) {
        REQUEST_ID_MAP.put(requestId, new Metrics(nettyIp, nettyPort, System.currentTimeMillis(), null));
    }
}

2.消费者定时在启动时创建定时线程池,每隔5s自动上报,更新Zookeeper临时节点的值
ConsumerBootStrap 入口有一个参数配置

// 之后会启动一个定时的线程池,每 5s 上传到注册中心
        configKeeper.setInterval(5);

ZkRegistryHandler 会开启一个 ScheduledExecutorService 线程池服务
RequestMetrics 的

// 定时上报
        final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        final boolean providerSide = configKeeper.isProviderSide();
        final int interval = configKeeper.getInterval();
        if (!providerSide && interval > 0) {
            REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    System.out.println("我是一个定时任务");
                    // ...
                }
            }, interval, interval, TimeUnit.SECONDS);
        }
  1. 每次上报时判断当前时间距离最后一次请求是否超过5s,超过5s则需要删除Zookeeper上面的内容

这里介绍下 RequestMetrics 的 getAllInstances() 方法, 如果 5 秒内没有响应清空请求时间。

/**
     * 获取所有节点耗时统计
     */
    public List<Metrics> getAllInstances() {
        List<Metrics> result = new ArrayList<>();
        COST_TIME_MAP.forEach((url, aLong) -> {
            String[] split = url.split(":");
            result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
        });
        return result;
    }

接下来简单说一下负载均衡策略,这里主要涉及到了使用那个客户端进行服务的请求。
作业 - 图3

public abstract class AbstractLoadBalanceStrategy implements LoadBalanceStrategy{
    @Override
    public RpcClient route(Map<String, List<RpcClient>> clientPool, String serverClassName) {
        List<RpcClient> rpcClients = clientPool.get(serverClassName);
        if (null == rpcClients) return null;
        return doSelect(rpcClients);
    }
    protected abstract RpcClient doSelect(List<RpcClient> rpcClients);
}

MinCostLoadBalance (该类未经验证)

public class MinCostLoadBalance extends AbstractLoadBalanceStrategy {
    @Override
    protected RpcClient doSelect(final List<RpcClient> rpcClients) {
        ConcurrentHashMap<String, Long> metricMap = RequestMetrics.getInstance().getMetricMap();
        RpcClient minCostRpcClient = rpcClients.get(0);
        final Long minLong = metricMap.get(minCostRpcClient.getNettyIp() + minCostRpcClient.getNettyPort());
        for (int i = 1; i < rpcClients.size(); i++) {
            RpcClient rpcClient = rpcClients.get(i);
            String nettyIp = rpcClient.getNettyIp();
            int nettyPort = rpcClient.getNettyPort();
            // 取出最小响应时间的客户端,并进行调用
            Long aLong = metricMap.get(nettyIp + nettyPort);
            if (aLong != null && aLong < minLong) {
                minCostRpcClient = rpcClient;
            }
        }
        return minCostRpcClient;
    }
}

RandomLoadBalance (该类未经验证)

public class RandomLoadBalance extends AbstractLoadBalanceStrategy {
    private final Random random = new Random();
    @Override
    protected RpcClient doSelect(List<RpcClient> rpcClients) {
        int size = rpcClients.size();
        int index = random.nextInt(size);
        return rpcClients.get(index);
    }
}

作业 - 图4
作业3
以下项目主要使用了 commons-dbcp + fastjson + apache.curator 技术进行实现。
这里会通过create [-s][-e] path data acl命令创建节点:
建立所需节点
我会将数据库配置的用户名和密码等信息写入/dbConfig/lagou.config.DbConfig 节点中。
先建立父节点

create  /dbConfig  ""

然后若不存在临时节点则重新创建

# 向 /dbConfig/lagou.config.DbConfig 中写入配置信息
create -e /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 查看是否能正常获取节点信息
get /dbConfig/lagou.config.DbConfig

更改数据

# 更改数据库为 aaaa
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 更改数据库为 bbbb
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/bbbb?serverTimezone=UTC"}

创建Java类

  1. 创建工具类 RuntimeContext
@Component
public class RuntimeContext implements ApplicationContextAware {
    private static ApplicationContext applicationContext = null;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (RuntimeContext.applicationContext == null) {
            RuntimeContext.applicationContext = applicationContext;
        }
    }
    //获取applicationContext
    private static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
    //通过name获取Bean
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }
    // ...
}
  1. 实体类
package lagou.config;
public class DbConfig {
    private String url;
    private String username;
    private String password;
    // setter getter 方法
}
  1. 创建 MyDataSource 自定义数据源,我们重写的dbcp中的类BasicDataSource,我们将其全部拷贝了出来,然后重命名为MyDataSource类,然后在其中修改了以下内容

3.1将 UNKNOWN_TRANSACTIONISOLATION 的值改为 -1. 否则这个内部变量会找不到

/**
     * The default TransactionIsolation state of connections created by this pool.
     */
    protected volatile int defaultTransactionIsolation =            PoolableConnectionFactory.UNKNOWN_TRANSACTIONISOLATION;

3.2jdk 1.7 之后需要实现该方法 getParentLogger()

@Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return null;
    }

3.3 修改已有的 createDataSource() 方法,删除这几行代码

作业 - 图5
3.4 新增 changeDataSource 方法

public static void changeDataSource() {
        MyDataSource dataSource = (MyDataSource) RuntimeContext.getBean("dataSource");
        try {
            dataSource.close();
            dataSource.createDataSource();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
  1. 新建 InitListener 类,该类实现了ServletContextListener 来对Zookeeper节点 /db/url 的监听
public class InitListener implements ServletContextListener {
    private static final String CONNENT_ADDR = "localhost:2181";
    private static final String PATH = "/dbConfig";
    private static final String SUB_PATH = PATH + "/" + DbConfig.class.getName();
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(CONNENT_ADDR)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();
        PathChildrenCache nodeCache = new PathChildrenCache(curatorFramework, "/dbConfig", true);
        try {
            nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
                System.out.println(pathChildrenCacheEvent.getType());
                if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())) {
                    final ChildData data = pathChildrenCacheEvent.getData();
                    if (data != null) {
                        final String path = data.getPath();
                        System.out.println(path);
                        System.out.println(SUB_PATH);
                        if (path.equals(SUB_PATH)) {
                            MyDataSource datasource = (MyDataSource) RuntimeContext.getBean("dataSource");
                            final DbConfig dbConfig = JSON.parseObject(new String(data.getData()), DbConfig.class);
                            System.out.println(dbConfig.toString());
                            datasource.setUrl(dbConfig.getUrl());
                            datasource.setUsername(dbConfig.getUsername());
                            datasource.setPassword(dbConfig.getPassword());
                            MyDataSource.changeDataSource();
                        }
                    }
                }
            });
            nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
    }
}
  1. 修改spring boot 启动类,注册 InitListener ,配置 我们自定义的 DataSource,建立一个query方法(可通过/query 进行访问)暴露出去。
@SpringBootApplication
@RestController
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
    @RequestMapping("/query")
    public String query() {
        String sql = "select id from `info` limit 1";
        return jdbcTemplate.queryForObject(sql, String.class);
    }
    @Bean
    public ServletListenerRegistrationBean servletListenerRegistrationBean() {
        ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
        servletListenerRegistrationBean.setListener(new InitListener());
        return servletListenerRegistrationBean;
    }
    @Bean
    public DataSource dataSource(@Value("${spring.datasource.url}") String url,
                                 @Value("${spring.datasource.username}") String username,
                                 @Value("${spring.datasource.password}") String password) {
        MyDataSource dataSource = new MyDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        return dataSource;
    }
    @Autowired
    private JdbcTemplate jdbcTemplate;
}

6.application.properties 配置

server.port=80
spring.datasource.url=jdbc:mysql://localhost:3306/aaaa?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456

验证
http://localhost/query

参考

基于Zookeeper动态切换数据源_BXS_0107的博客-CSDN博客
https://blog.csdn.net/newbie0107/article/details/105500579

作者:ac86
链接:https://www.jianshu.com/p/eafe3134bdc8
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。