author:彭程


介绍

dble实现了AIO和NIO两种IO,本文主要介绍dble的NIO过程

NIO介绍

NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道,实现多路复用和异步化处理任务。
NIO和传统IO(一下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。
dble的IO流程 - 图1
dble实现了mysql协议,以客户端与dble的front连接为例,主要步骤如下:
1、服务端发送握手包;
2、dble 处理客户端的回复包.

服务端发送握手包

dble启动类dblesever中,dble会判断是使用AIO还是NIO,判断为NIO后会分别创建前后端的读写队列,并初始化两个NIOAcceptor对象:manager和server,分别对应与管理端口和服务端口。之后会启动两个NIOAccepter对象的线程。

  1. public final class DbleServer {
  2. ...
  3. if (aio) { //判断IO类型
  4. int processorCount = frontProcessorCount + backendProcessorCount;
  5. LOGGER.info("using aio network handler ");
  6. asyncChannelGroups = new AsynchronousChannelGroup[processorCount];
  7. initAioProcessor(processorCount);
  8. connector = new AIOConnector();
  9. manager = new AIOAcceptor(NAME + "Manager", SystemConfig.getInstance().getBindIp(),
  10. SystemConfig.getInstance().getManagerPort(), 100, new ManagerConnectionFactory(), this.asyncChannelGroups[0]);
  11. server = new AIOAcceptor(NAME + "Server", SystemConfig.getInstance().getBindIp(),
  12. SystemConfig.getInstance().getServerPort(), SystemConfig.getInstance().getServerBacklog(), new ServerConnectionFactory(), this.asyncChannelGroups[0]);
  13. } else {
  14. for (int i = 0; i < frontProcessorCount; i++) {
  15. frontExecutor.execute(new RW(frontRegisterQueue));
  16. }
  17. for (int i = 0; i < backendProcessorCount; i++) {
  18. backendExecutor.execute(new RW(backendRegisterQueue));
  19. }
  20. connector = new NIOConnector(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "NIOConnector", backendRegisterQueue);
  21. connector.start();
  22. //管理端口
  23. manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Manager", SystemConfig.getInstance().getBindIp(),
  24. SystemConfig.getInstance().getManagerPort(), 100, new ManagerConnectionFactory(), frontRegisterQueue);
  25. //服务端口
  26. server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", SystemConfig.getInstance().getBindIp(),
  27. SystemConfig.getInstance().getServerPort(), SystemConfig.getInstance().getServerBacklog(), new ServerConnectionFactory(), frontRegisterQueue);
  28. }
  29. ...
  30. manager.start();
  31. LOGGER.info(manager.getName() + " is started and listening on " + manager.getPort());
  32. server.start();
  33. LOGGER.info(server.getName() + " is started and listening on " + server.getPort());
  34. LOGGER.info("=====================================Server started success=======================================");
  35. }

DBLE 处理客户端 connect 的代码在 NIOAcceptor中:

  1. public final class NIOAcceptor extends Thread implements SocketAcceptor {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(NIOAcceptor.class);
  3. private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();
  4. private final int port;
  5. private final Selector selector;
  6. private final ServerSocketChannel serverChannel;
  7. private final FrontendConnectionFactory factory;
  8. private ConcurrentLinkedQueue<AbstractConnection> frontRegisterQueue;
  9. public NIOAcceptor(String name, String bindIp, int port, int backlog, FrontendConnectionFactory factory,
  10. ConcurrentLinkedQueue<AbstractConnection> frontRegisterQueue) throws IOException {
  11. super.setName(name);
  12. this.port = port;
  13. this.selector = Selector.open(); //打开选择器(Channel都会由Selector统一管理);
  14. this.serverChannel = ServerSocketChannel.open(); //打开服务端 Socket 通道,并绑定监听的端口号
  15. this.serverChannel.configureBlocking(false); //设置当前通道为非阻塞状态;
  16. //set TCP option
  17. serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
  18. serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);
  19. serverChannel.bind(new InetSocketAddress(bindIp, port), backlog);
  20. this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); //将服务器 Socket 通道注册进Selector,由选择器统一管理。
  21. /*
  22. SelectionKey中封装了事件的四种类型:
  23. OP_READ:可读事件;
  24. OP_WRITE:可写事件;
  25. OP_CONNECT:客户端连接服务端的事件(tcp连接),一般为创建SocketChannel客户端channel;
  26. OP_ACCEPT:服务端接收客户端连接的事件,一般为创建ServerSocketChannel服务端channel;
  27. */
  28. this.factory = factory;
  29. this.frontRegisterQueue = frontRegisterQueue;
  30. }
  31. ...
  32. }

NIOAccepter#run方法会不断检查是否有有效连接的通道,有的话就执行NIOAccepter#accept方法。

  1. public void run() {
  2. final Selector tSelector = this.selector; //selector.select():返回已经准备就绪的通道个数(这些通道包含你感兴趣的的事件)。
  3. //比如:你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。
  4. for (; ; ) {
  5. try {
  6. tSelector.select(1000L);
  7. Set<SelectionKey> keys = tSelector.selectedKeys(); //selector.selectedKeys():获取就绪通道的事件列表。
  8. try {
  9. for (SelectionKey key : keys) {
  10. if (key.isValid() && key.isAcceptable()) { //当连接有效且可接受时,处理客户端连接
  11. accept();
  12. } else {
  13. key.cancel();
  14. }
  15. }
  16. } catch (final Throwable e) {
  17. LOGGER.warn("caught Throwable err: ", e);
  18. } finally {
  19. keys.clear();
  20. }
  21. } catch (Exception e) {
  22. LOGGER.info(getName(), e);
  23. }
  24. }
  25. }

accpt方法中成功建立了TCP连接,并将连接放入队列中,之后执行wakeupFrontedSelector()方法

  1. private void accept() {
  2. SocketChannel channel = null;
  3. try {
  4. //与客户端建立TCP连接
  5. channel = serverChannel.accept();
  6. channel.configureBlocking(false); //调整当前通道为非阻塞模式
  7. NIOSocketWR socketWR = new NIOSocketWR();
  8. FrontendConnection c = factory.make(channel, socketWR);
  9. socketWR.initFromConnection(c);
  10. c.setId(ID_GENERATOR.getId());
  11. IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();
  12. c.setProcessor(processor);
  13. frontRegisterQueue.offer(c); //将连接放入队列
  14. wakeupFrontedSelector(); //唤醒selector
  15. } catch (Exception e) {
  16. LOGGER.info(getName(), e);
  17. closeChannel(channel);
  18. }
  19. }

wakeupFrontedSelector()方法创建了一个RW对象,并唤醒了其select,RW用于读写连接中的数据

  1. private void wakeupFrontedSelector() {
  2. Map<Thread, Runnable> threadRunnableMap = DbleServer.getInstance().getRunnableMap().get(DbleServer.FRONT_EXECUTOR_NAME);
  3. for (Map.Entry<Thread, Runnable> runnableEntry : threadRunnableMap.entrySet()) {
  4. RW rw = (RW) runnableEntry.getValue();
  5. rw.getSelector().wakeup();
  6. }
  7. }

直接来看RW的run方法,RW首先将dble与客户端的连接注册到了RW的selector中,之后一旦连接中有数据,就会进行处理

  1. public void run() {
  2. final Selector finalSelector = this.selector;
  3. Set<SelectionKey> keys = null;
  4. if (SystemConfig.getInstance().getUseThreadUsageStat() == 1) {
  5. this.useThreadUsageStat = true;
  6. this.workUsage = new ThreadWorkUsage();
  7. DbleServer.getInstance().getThreadUsedMap().put(Thread.currentThread().getName(), workUsage);
  8. }
  9. for (; ; ) {
  10. try {
  11. ...
  12. register(finalSelector); //注册selector
  13. keys = finalSelector.selectedKeys();
  14. if (keys.size() == 0) {
  15. continue;
  16. }
  17. executeKeys(keys); //对有相应事件的连接进行处理
  18. ...
  19. }
  20. }
  21. }
  1. private void register(Selector finalSelector) {
  2. AbstractConnection c;
  3. if (registerQueue.isEmpty()) {
  4. return;
  5. }
  6. while ((c = registerQueue.poll()) != null) {
  7. try {
  8. //将连接注册到了RW的selector多路复用选择器中,使得该选择器后续能够读取该连接发送过来的数据
  9. ((NIOSocketWR) c.getSocketWR()).register(finalSelector);
  10. c.register();
  11. } catch (Exception e) {
  12. //todo 确认调用register的时候会发生什么
  13. LOGGER.warn("register err", e);
  14. }
  15. }
  16. }

dble处理客户端的回复包

当客户端返回回复包之后,从上面RW的run方法可以看到,RW会调用RW#executeKeys方法进行处理,

  1. private void executeKeys(Set<SelectionKey> keys) {
  2. for (SelectionKey key : keys) {
  3. AbstractConnection con = null;
  4. try {
  5. Object att = key.attachment();
  6. if (att != null) {
  7. con = (AbstractConnection) att;
  8. if (con.isClosed()) {
  9. key.cancel();
  10. }
  11. if (key.isValid() && key.isReadable()) {
  12. try {
  13. con.asyncRead(); //读取客户端发送过来的数据,asynRead()异步读方法:获取缓冲区,调用通道的read方法,将数据读到缓冲区。
  14. } catch (ClosedChannelException e) {
  15. //happens when close and read running in parallel.
  16. //sometimes ,no byte could be read,but an read event triggered with zero bytes although cause this.
  17. LOGGER.info("read bytes but the connection is closed .connection is {}. May be the connection closed suddenly.", con);
  18. key.cancel();
  19. continue;
  20. }
  21. ...
  22. }
  23. }
  24. }

这里调用了NIOSocketWR的asyncRead方法将客户端发送的数据进行了读取到缓存,并进行下一步处理

  1. public void asyncRead() throws IOException {
  2. ByteBuffer theBuffer = con.findReadBuffer(); //读取客户端发送过来的数据到缓存theBuffer中
  3. int got = channel.read(theBuffer);
  4. con.onReadData(got); //处理相应的数据
  5. }

AbstractConnection#onReadData方法继续调用AbstractConnection#handle方法,对每个数据包进行处理

  1. private void handle(ByteBuffer dataBuffer) {
  2. boolean hasRemaining = true;
  3. int offset = 0;
  4. while (hasRemaining) {
  5. ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);
  6. //下面处理了客户端传过来的数据包
  7. switch (result.getCode()) {
  8. case PART_OF_BIG_PACKET:
  9. extraPartOfBigPacketCount++;
  10. if (!result.isHasMorePacket()) {
  11. readReachEnd();
  12. dataBuffer.clear();
  13. }
  14. break;
  15. case COMPLETE_PACKET: //完整的数据包,进入这里的处理逻辑
  16. processPacketData(result); //进行封装
  17. if (!result.isHasMorePacket()) {
  18. readReachEnd();
  19. dataBuffer.clear();
  20. }
  21. break;
  22. case BUFFER_PACKET_UNCOMPLETE:
  23. compactReadBuffer(dataBuffer, result.getOffset());
  24. break;
  25. case BUFFER_NOT_BIG_ENOUGH:
  26. ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());
  27. break;
  28. default:
  29. throw new RuntimeException("unknown error when read data");
  30. }
  31. hasRemaining = result.isHasMorePacket();
  32. if (hasRemaining) {
  33. offset = result.getOffset();
  34. }
  35. }
  36. }

AbstractConnection#processPacketData会读取客户端发送过来的握手包,并将它封装成异步任务以备下一步处理。

  1. private void processPacketData(ProtoHandlerResult result) {
  2. //这里将读取的数据封装成task任务,提交到队列中,然后通过线程异步处理
  3. byte[] packetData = result.getPacketData();
  4. if (packetData != null) {
  5. int tmpCount = extraPartOfBigPacketCount;
  6. if (!isSupportCompress) {
  7. extraPartOfBigPacketCount = 0;
  8. pushServiceTask(new NormalServiceTask(packetData, service, tmpCount));
  9. } else {
  10. List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
  11. if (decompressUnfinishedDataQueue.isEmpty()) {
  12. extraPartOfBigPacketCount = 0;
  13. }
  14. for (byte[] pack : packs) {
  15. if (pack.length != 0) {
  16. pushServiceTask(new NormalServiceTask(pack, service, tmpCount));
  17. }
  18. }
  19. }
  20. }
  21. }