author:彭程
介绍
dble实现了AIO和NIO两种IO,本文主要介绍dble的NIO过程
NIO介绍
NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道,实现多路复用和异步化处理任务。
NIO和传统IO(一下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。

dble实现了mysql协议,以客户端与dble的front连接为例,主要步骤如下:
1、服务端发送握手包;
2、dble 处理客户端的回复包.
服务端发送握手包
dble启动类dblesever中,dble会判断是使用AIO还是NIO,判断为NIO后会分别创建前后端的读写队列,并初始化两个NIOAcceptor对象:manager和server,分别对应与管理端口和服务端口。之后会启动两个NIOAccepter对象的线程。
public final class DbleServer {...if (aio) { //判断IO类型int processorCount = frontProcessorCount + backendProcessorCount;LOGGER.info("using aio network handler ");asyncChannelGroups = new AsynchronousChannelGroup[processorCount];initAioProcessor(processorCount);connector = new AIOConnector();manager = new AIOAcceptor(NAME + "Manager", SystemConfig.getInstance().getBindIp(),SystemConfig.getInstance().getManagerPort(), 100, new ManagerConnectionFactory(), this.asyncChannelGroups[0]);server = new AIOAcceptor(NAME + "Server", SystemConfig.getInstance().getBindIp(),SystemConfig.getInstance().getServerPort(), SystemConfig.getInstance().getServerBacklog(), new ServerConnectionFactory(), this.asyncChannelGroups[0]);} else {for (int i = 0; i < frontProcessorCount; i++) {frontExecutor.execute(new RW(frontRegisterQueue));}for (int i = 0; i < backendProcessorCount; i++) {backendExecutor.execute(new RW(backendRegisterQueue));}connector = new NIOConnector(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "NIOConnector", backendRegisterQueue);connector.start();//管理端口manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Manager", SystemConfig.getInstance().getBindIp(),SystemConfig.getInstance().getManagerPort(), 100, new ManagerConnectionFactory(), frontRegisterQueue);//服务端口server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", SystemConfig.getInstance().getBindIp(),SystemConfig.getInstance().getServerPort(), SystemConfig.getInstance().getServerBacklog(), new ServerConnectionFactory(), frontRegisterQueue);}...manager.start();LOGGER.info(manager.getName() + " is started and listening on " + manager.getPort());server.start();LOGGER.info(server.getName() + " is started and listening on " + server.getPort());LOGGER.info("=====================================Server started success=======================================");}
DBLE 处理客户端 connect 的代码在 NIOAcceptor中:
public final class NIOAcceptor extends Thread implements SocketAcceptor {private static final Logger LOGGER = LoggerFactory.getLogger(NIOAcceptor.class);private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();private final int port;private final Selector selector;private final ServerSocketChannel serverChannel;private final FrontendConnectionFactory factory;private ConcurrentLinkedQueue<AbstractConnection> frontRegisterQueue;public NIOAcceptor(String name, String bindIp, int port, int backlog, FrontendConnectionFactory factory,ConcurrentLinkedQueue<AbstractConnection> frontRegisterQueue) throws IOException {super.setName(name);this.port = port;this.selector = Selector.open(); //打开选择器(Channel都会由Selector统一管理);this.serverChannel = ServerSocketChannel.open(); //打开服务端 Socket 通道,并绑定监听的端口号this.serverChannel.configureBlocking(false); //设置当前通道为非阻塞状态;//set TCP optionserverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);serverChannel.bind(new InetSocketAddress(bindIp, port), backlog);this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); //将服务器 Socket 通道注册进Selector,由选择器统一管理。/*SelectionKey中封装了事件的四种类型:OP_READ:可读事件;OP_WRITE:可写事件;OP_CONNECT:客户端连接服务端的事件(tcp连接),一般为创建SocketChannel客户端channel;OP_ACCEPT:服务端接收客户端连接的事件,一般为创建ServerSocketChannel服务端channel;*/this.factory = factory;this.frontRegisterQueue = frontRegisterQueue;}...}
NIOAccepter#run方法会不断检查是否有有效连接的通道,有的话就执行NIOAccepter#accept方法。
public void run() {final Selector tSelector = this.selector; //selector.select():返回已经准备就绪的通道个数(这些通道包含你感兴趣的的事件)。//比如:你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。for (; ; ) {try {tSelector.select(1000L);Set<SelectionKey> keys = tSelector.selectedKeys(); //selector.selectedKeys():获取就绪通道的事件列表。try {for (SelectionKey key : keys) {if (key.isValid() && key.isAcceptable()) { //当连接有效且可接受时,处理客户端连接accept();} else {key.cancel();}}} catch (final Throwable e) {LOGGER.warn("caught Throwable err: ", e);} finally {keys.clear();}} catch (Exception e) {LOGGER.info(getName(), e);}}}
accpt方法中成功建立了TCP连接,并将连接放入队列中,之后执行wakeupFrontedSelector()方法
private void accept() {SocketChannel channel = null;try {//与客户端建立TCP连接channel = serverChannel.accept();channel.configureBlocking(false); //调整当前通道为非阻塞模式NIOSocketWR socketWR = new NIOSocketWR();FrontendConnection c = factory.make(channel, socketWR);socketWR.initFromConnection(c);c.setId(ID_GENERATOR.getId());IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();c.setProcessor(processor);frontRegisterQueue.offer(c); //将连接放入队列wakeupFrontedSelector(); //唤醒selector} catch (Exception e) {LOGGER.info(getName(), e);closeChannel(channel);}}
wakeupFrontedSelector()方法创建了一个RW对象,并唤醒了其select,RW用于读写连接中的数据
private void wakeupFrontedSelector() {Map<Thread, Runnable> threadRunnableMap = DbleServer.getInstance().getRunnableMap().get(DbleServer.FRONT_EXECUTOR_NAME);for (Map.Entry<Thread, Runnable> runnableEntry : threadRunnableMap.entrySet()) {RW rw = (RW) runnableEntry.getValue();rw.getSelector().wakeup();}}
直接来看RW的run方法,RW首先将dble与客户端的连接注册到了RW的selector中,之后一旦连接中有数据,就会进行处理
public void run() {final Selector finalSelector = this.selector;Set<SelectionKey> keys = null;if (SystemConfig.getInstance().getUseThreadUsageStat() == 1) {this.useThreadUsageStat = true;this.workUsage = new ThreadWorkUsage();DbleServer.getInstance().getThreadUsedMap().put(Thread.currentThread().getName(), workUsage);}for (; ; ) {try {...register(finalSelector); //注册selectorkeys = finalSelector.selectedKeys();if (keys.size() == 0) {continue;}executeKeys(keys); //对有相应事件的连接进行处理...}}}
private void register(Selector finalSelector) {AbstractConnection c;if (registerQueue.isEmpty()) {return;}while ((c = registerQueue.poll()) != null) {try {//将连接注册到了RW的selector多路复用选择器中,使得该选择器后续能够读取该连接发送过来的数据((NIOSocketWR) c.getSocketWR()).register(finalSelector);c.register();} catch (Exception e) {//todo 确认调用register的时候会发生什么LOGGER.warn("register err", e);}}}
dble处理客户端的回复包
当客户端返回回复包之后,从上面RW的run方法可以看到,RW会调用RW#executeKeys方法进行处理,
private void executeKeys(Set<SelectionKey> keys) {for (SelectionKey key : keys) {AbstractConnection con = null;try {Object att = key.attachment();if (att != null) {con = (AbstractConnection) att;if (con.isClosed()) {key.cancel();}if (key.isValid() && key.isReadable()) {try {con.asyncRead(); //读取客户端发送过来的数据,asynRead()异步读方法:获取缓冲区,调用通道的read方法,将数据读到缓冲区。} catch (ClosedChannelException e) {//happens when close and read running in parallel.//sometimes ,no byte could be read,but an read event triggered with zero bytes although cause this.LOGGER.info("read bytes but the connection is closed .connection is {}. May be the connection closed suddenly.", con);key.cancel();continue;}...}}}
这里调用了NIOSocketWR的asyncRead方法将客户端发送的数据进行了读取到缓存,并进行下一步处理
public void asyncRead() throws IOException {ByteBuffer theBuffer = con.findReadBuffer(); //读取客户端发送过来的数据到缓存theBuffer中int got = channel.read(theBuffer);con.onReadData(got); //处理相应的数据}
AbstractConnection#onReadData方法继续调用AbstractConnection#handle方法,对每个数据包进行处理
private void handle(ByteBuffer dataBuffer) {boolean hasRemaining = true;int offset = 0;while (hasRemaining) {ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);//下面处理了客户端传过来的数据包switch (result.getCode()) {case PART_OF_BIG_PACKET:extraPartOfBigPacketCount++;if (!result.isHasMorePacket()) {readReachEnd();dataBuffer.clear();}break;case COMPLETE_PACKET: //完整的数据包,进入这里的处理逻辑processPacketData(result); //进行封装if (!result.isHasMorePacket()) {readReachEnd();dataBuffer.clear();}break;case BUFFER_PACKET_UNCOMPLETE:compactReadBuffer(dataBuffer, result.getOffset());break;case BUFFER_NOT_BIG_ENOUGH:ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());break;default:throw new RuntimeException("unknown error when read data");}hasRemaining = result.isHasMorePacket();if (hasRemaining) {offset = result.getOffset();}}}
AbstractConnection#processPacketData会读取客户端发送过来的握手包,并将它封装成异步任务以备下一步处理。
private void processPacketData(ProtoHandlerResult result) {//这里将读取的数据封装成task任务,提交到队列中,然后通过线程异步处理byte[] packetData = result.getPacketData();if (packetData != null) {int tmpCount = extraPartOfBigPacketCount;if (!isSupportCompress) {extraPartOfBigPacketCount = 0;pushServiceTask(new NormalServiceTask(packetData, service, tmpCount));} else {List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);if (decompressUnfinishedDataQueue.isEmpty()) {extraPartOfBigPacketCount = 0;}for (byte[] pack : packs) {if (pack.length != 0) {pushServiceTask(new NormalServiceTask(pack, service, tmpCount));}}}}}
