本文介绍Java的网络编程, 如何从BIO走到NIO再走到今天的Netty
1. BIO
就是最基本的Socket操作, 可以随便找一本大学Java教材, 一般都会有
public static void main(String[] args) throws IOException {
//...
ServerSocket server = new ServerSocket(8080);
while (true) {
Socket client = server.accept(); //主线程: 阻塞操作1
//一般情况下, 在accept之后会把这个client交给其他线程处理
InputStream inputStream = client.getInputStream(); //其他线程: 阻塞操作2
}
//... 省略若干代码
}
2. 第一代 Java NIO
NIO的出现主要是为了解决BIO的问题
优点:
- 解决了BIO线程数量限制, 个位数的线程就可以处理大量的连接
- 将建立连接 和 接收数据进行了区分
public static void main(String[] args) throws IOException {
LinkedList<SocketChannel> clientList = new LinkedList<>();
ServerSocketChannel ss = ServerSocketChannel.open();
ss.bind(new InetSocketAddress(8080));
ss.configureBlocking(false);//设置为非阻塞
while (true) {
//--------------------主线程----------------------------
//这里的代码块, 实际上就是可以用一个线程轮询, 是否有新的连接进入
SocketChannel client = ss.accept();//非阻塞, 可能返回null
if (client != null) {
//这里非空,就表示有新的连接进入
client.configureBlocking(false);//设置为非阻塞
clientList.add(client);
}
//--------------------主线程----------------------------
//--------------------工作线程(池)----------------------------
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
//这里可以在其他线程池处理, 用一个或者几个线程疯狂遍历已经连接的客户端, 检查有没有新的数据进来
//当然这里的代码就是在主线程中遍历,并读取数据,也是可以的, 就是效率肯定没有另外拉个线程读取数据来得高
for (SocketChannel c : clientList) {//!!!!!!!!!!!!!!注意这里的轮询非常浪费CPU, 需要每个连接都去检查
int num = c.read(buffer);//!!!!!!!!!!! 从客户端读取数据, 虽然这一步不是阻塞的, 但是这一步会发生用户态到内核态的转换
if (num > 0) {//读得到就处理, 读不到就拉倒
buffer.flip();
byte[] aaa = new byte[buffer.limit()];
buffer.get(aaa);
String b = new String(aaa);
System.out.println(b);
buffer.clear();
}
}
//--------------------工作线程(池)----------------------------
}
}
缺点:
- 建立连接的线程,和读取数据的线程, 都需要不停的轮询, 造成了大量的无效的操作, 浪费了大量CPU资源
- accept和read函数实际上会导致一次用户态到内核态的切换,也会浪费CPU
3. 第二代 Java NIO
使用selector的NIO:
用一条线程绑定多路复用器来处理所有连接(包括accept事件,read事件,write事件等)
优点:
- 不在需要不停的轮询了, 当发生连接或者读取事件的时候, 是由多路复用器来通知我们, 节约了CPU资源; (这里可以选择 1.阻塞等待多路复用器 或者 2.轮询查看多路复用器)
public class SelectorNIO_1 {
public static void main(String[] args) throws IOException {
SocketMultiplexSingleThreadV1 service = new SocketMultiplexSingleThreadV1();
service.start();
}
/**
* 单线程处理多路复用器的demo
*/
static class SocketMultiplexSingleThreadV1 {
private ServerSocketChannel server = null;
private Selector selector = null;
private int port = 8080;
public void initServer() throws IOException {
server = ServerSocketChannel.open();
server.configureBlocking(false);//非阻塞
server.bind(new InetSocketAddress(port));
selector = Selector.open();//创建一个多路复用器
server.register(selector, SelectionKey.OP_ACCEPT);//向多路复用器 注册server, 并且设置监听ACCEPT事件
}
public void start() throws IOException {
//1. 启动服务器,同时注册号多路复用器
initServer();
while (true) {
//2. 从多路复用器中拿需要处理的文件描述符
int readyNum = selector.select();//会阻塞,直到有响应事件, 这个阻塞是针对多路复用器的,要注意!!
//select():阻塞到至少有一个通道在你注册的事件上就绪了。
//select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
//selectNow():非阻塞,只要有通道就绪就立刻返回。
if (readyNum > 0) {
//3. 遍历需要处理的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼
//4. 处理这个key
if (key.isAcceptable()) {
acceptHandler(key);//建立连接操作
} else if (key.isReadable()) {
readHandler(key);//读取数据曹邹
}
}
}
}
}
//监听到客户端的连接请求事件之后
private void acceptHandler(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//1. 创建连接
SocketChannel client = ssc.accept();//建立连接
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
//2. 将这个连接和一个buffer绑定到一起, buffer当做key的attachment存进去
//3. 向多路复用器注册监听 该连接的读取事件
SelectionKey selectionKey = client.register(selector, SelectionKey.OP_READ, buffer);//向多路复用器绑定,读取事件
//这个selectionKey和到时候发生读取事件读取的key是一个对象, 这里暂时没用它
System.out.println("新客户端: " + client.getRemoteAddress());
}
private void readHandler(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();//这个就是在建立连接的时候,当做attachment存进去的buffer
buffer.clear();
int read;
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
//这里是把数据写回客户端
//这里可以写其他的自己的业务逻辑
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else { //-1 可能是客户端关闭了, close_wait
client.close();
break;
}
}
}
}
}
缺点:
- 由于只有一个线程在处理连接, 当我们线程处理读写事件的时候, 没法立刻对连接事件作出响应
- 多个读写事件都在等待一个线程处理, 也就无法将网卡的IO能力完全发挥出来
4. 第三代Java NIO
使用Selector的NIO
�1个boss线程专门处理accept事件
n个worker线程专门处理读写事件
�优点:
- 1个线程专门处理
ACCEPT
事件, 对新连接的响应非常快 - 专门有n个线程, 来处理读写事件, 效率非常的高
- 思想上, 这种设计是能够最大的压榨机器性能的
public class SelectorNIO_2 {
public static void main(String[] args) throws IOException, InterruptedException {
//new的时候,就创建了boss线程和worker线程,只是没有start
SocketMultiplexingThreads socketMultiplexingThreads = new SocketMultiplexingThreads(9090, 4);
//start boss线程和worker线程, 此时可以接收请求了
socketMultiplexingThreads.start();
}
//包装了boss线程和worker线程的创建初始化过程
static class SocketMultiplexingThreads {
//服务
private ServerSocketChannel server;
//boss线程
private BossThread bossThread;
//worker线程
private List<WorkerThread> workerThreads;
//服务绑定的端口号
private int port;
//构造方法, 同时初始化server, bossThread, workerThread
public SocketMultiplexingThreads(int port, int workerNum) throws IOException {
this.port = port;
server = ServerSocketChannel.open();
server.configureBlocking(false);//非阻塞
server.bind(new InetSocketAddress(this.port));
this.bossThread = new BossThread(server, this);
//创建boss线程
workerThreads = new ArrayList<>(workerNum);
for (int i = 0; i < workerNum; i++) {
//创建worker线程
workerThreads.add(new WorkerThread(server));
}
}
//粗糙的启动方法
public void start() throws InterruptedException {
bossThread.start();
for (WorkerThread workerThread : workerThreads) {
workerThread.start();
}
Thread.sleep(1000 * 60 * 60);
}
//随机获取一个worker线程
WorkerThread randomGetWorkerThread() {
Random random = new Random();
int index = random.nextInt(workerThreads.size());
return workerThreads.get(index);
}
}
//boss线程只处理accept事件
static class BossThread extends Thread {
private Selector bossSelector;
private SocketMultiplexingThreads socketMultiplexingThreads;
public BossThread(ServerSocketChannel server, SocketMultiplexingThreads socketMultiplexingThreads) throws IOException {
this.socketMultiplexingThreads = socketMultiplexingThreads;
//创建Boss多路复用器
this.bossSelector = Selector.open();
//向主线程注册ACCEPT事件
server.register(bossSelector, SelectionKey.OP_ACCEPT);//向多路复用器 注册server, 并且设置监听ACCEPT事件
System.out.println("Boss 线程创建完毕");
}
@lombok.SneakyThrows
@Override
public void run() {
//boss线程不停的轮询, 等待新的连接, 专门处理accept事件
while (true) {
int readyNum = bossSelector.select(100);
if (readyNum > 0) {
Set<SelectionKey> selectionKeys = bossSelector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
System.out.println("boss 线程处理事件");
SelectionKey key = iterator.next();
iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼
//Boss线程只处理Accept事件
if (key.isAcceptable()) {
acceptHandler(key);//建立连接操作
}
}
}
}
}
//处理accept事件,将读事件绑定到worker线程上去
private void acceptHandler(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//1. 创建连接
SocketChannel client = ssc.accept();//建立连接
client.configureBlocking(false);
//2. 分配buffer给这个连接, 其实这里如果把分配内存放到worker线程中去做, boss能处理更多的连接, 这里就不演示了, 不然代码就不清晰了
ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
//3. 获取到worker线程的多路复用器
WorkerThread workerThread = socketMultiplexingThreads.randomGetWorkerThread();//这里用的是随机获取一个, 算法可以自己实现
Selector workerSelector = workerThread.getWorkerSelector();
//4. 向多路复用器注册监听 该连接的读取事件
SelectionKey selectionKey = client.register(workerSelector, SelectionKey.OP_READ, buffer);//向多路复用器绑定,读取事件
//这个selectionKey和到时候发生读取事件读取的key是一个对象, 这里暂时没用它
System.out.println("向worker线程注册读取事件: " + client.getRemoteAddress());
}
}
//worker线程只处理读写事件
static class WorkerThread extends Thread {
private Selector workerSelector;
private static AtomicInteger accumulator = new AtomicInteger(0);
private int id = accumulator.getAndIncrement();//作为线程的标识
public WorkerThread(ServerSocketChannel server) throws IOException {
//创建Worker多路复用器
this.workerSelector = Selector.open();
System.out.println(this + " worker 线程创建完毕");
}
public Selector getWorkerSelector() {
return this.workerSelector;
}
@SneakyThrows
@Override
public void run() {
//worker线程不停的轮询, 等待新的连接, 专门处理读事件
while (true) {
int readyNum = workerSelector.select(100);
if (readyNum > 0) {
Set<SelectionKey> selectionKeys = workerSelector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
System.out.println(this + " worker 线程处理事件");
SelectionKey key = iterator.next();
iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼
//Boss线程只处理Accept事件
if (key.isReadable()) {
readHandler(key);//建立连接操作
}
}
}
}
}
//worker线程处理读写事件
private void readHandler(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();//这个就是在建立连接的时候,当做attachment存进去的buffer
buffer.clear();
int read;
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
//这里是把数据写回客户端
//这里可以写其他的自己的业务逻辑
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else { //-1 可能是客户端关闭了, close_wait
client.close();
break;
}
}
}
@Override
public String toString() {
return "worker线程[" + id + "]";
}
}
}
缺点:
- 需要自己直接使用NIO的接口, 那么意味着可能会遇到各种坑
- 已经有Netty这种成熟框架包装了NIO的接口, 没有必要自己再写一套基于NIO的框架(但是方便帮助自己理解netty的思想)
5. Netty的使用
- Netty对JDK的NIO又再进行了一次包装, 底层还是调用JDK的NIO接口进行操作
- Netty的设计哲学是将所有网络操作抽象成 事件, 再将事件交给 EventLoop 去处理; (包括建立连接,读,写 都是事件)
-
5.1 单线程Netty
用一个线程去处理所有 事件 ```java public class NettyIO_1 {
public static void main(String[] args) throws InterruptedException {
//boss线程只设置1个
NioEventLoopGroup boss = new NioEventLoopGroup(1);//group可以设置多个线程,但是我们这只设置了1个
ServerBootstrap boot = new ServerBootstrap();
//注意,这里特地写成两个boss
//意味着: boss线程不但处理accept事件,也处理读写事件
boot.group(boss, boss)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MyInbound());
}
});
ChannelFuture future = boot.bind(8080).sync();
future.channel().closeFuture().sync();
}
static class MyInbound extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//todo
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//todo
}
}
}
<a name="anM7H"></a>
## 5.2 多线程Netty
```java
public class NettyIO_2 {
public static void main(String[] args) throws InterruptedException {
//boss线程设置4个
//线程既是boss也是worker
NioEventLoopGroup boss = new NioEventLoopGroup(4);
ServerBootstrap boot = new ServerBootstrap();
boot.group(boss, boss)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MyInbound());
}
});
ChannelFuture future = boot.bind(8080).sync();
future.channel().closeFuture().sync();
}
static class MyInbound extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);//echo的作用, 将读到的东西,写回客户端
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
}
5.3 Netty的经典使用
- 在多线程的基础上,将线程分为boss和worker
- boss 专门用来处理连接事件, 只有一个线程
worker 专门用来处理读写事件, 可以有多个线程 ```java public class NettyIO_3 {
public static void main(String[] args) throws InterruptedException {
//bossGroup, 专门处理连接事件
NioEventLoopGroup boss = new NioEventLoopGroup(99);//即使这里传入了99, 会创建99个EventLoop对象, 但是线程被使用的只有一个, 也就是说只有一个线程处理ACCEPT事件
//workerGroup 专门处理读写事件
NioEventLoopGroup worker = new NioEventLoopGroup(20);//20个线程用来处理读写事件
ServerBootstrap boot = new ServerBootstrap();
boot.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MyInbound());
}
});
ChannelFuture future = boot.bind(8080).sync();
future.channel().closeFuture().sync();
}
static class MyInbound extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);//echo的作用, 将读到的东西,写回客户端
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
} ```