优点:
并发高
传输快
封装好
netty的底层是nio,所以第一阶段我们先了解nio
学习前的准备,NIO学习
传统的bio 阻塞i/o模型,一个线程只能处理一个客户端,客户端没任务时线程阻塞,资源利用率较低,引入线程同样如此
nio 非阻塞i/o模型,引入三大组件
三大组件
channel
# 管道 数据输入输出的通道的通道
# 常见的channel有
FileChannel
ServerSocketChannel
SocketChannel
//工作在阻塞模式下
FileChannel channel = new FileInputStream("1.txt").getChannel();
public static void main(String[] args) {
try {
FileChannel from =new FileInputStream("1.txt").getChannel();
FileChannel to = new FileOutputStream("2.txt").getChannel();
from.transferTo(0, from.size(), to);//一次只能传输2g的文件
System.out.println("ok");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
buffer
# 缓冲区 用来协助数据输入与输出
ByteBuffer
数据结构
private int position = 0; //buffer当前读写位置
private int limit;//buffer的读写限制
private int capacity;//buffer的大小
ByteBuffer buffer = ByteBuffer.allocate(12); //使用allocate创建并分配空间,分配堆内存,读写效率高
ByteBuffer buffer = ByteBuffer.allocateDirect(12);//分配的是直接内存,读写效率低,且不受垃圾回收影响
常用api:
buffer.put();//向缓冲区写入
buffer.get();//从缓冲区获取
buffer.flip();//切换为读模式
buffer.clear();//切换为写模式
buffer.compact();//切换为写模式,并将没读取的字节向前移动
buffer.remaining();//获得buffer中未读的字节数
buffer.hasRemaining();//判断buffer是否全部读取
buffer.rewind();//从头开始读
buffer.mark();//记录position的位置
buffer.reset();//跳转到mark标记的位置
String str = Charset.defaultCharset().decode(buffer).toString();//buffer转换为String
buffer = StandardCharsets.UTF_8.encode(str);//字符串转换为buffer
selector
阻塞模式
server
package feizuse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class server {
public static void main(String[] args) throws IOException {
//创建服务器
ServerSocketChannel server = ServerSocketChannel.open();
//绑定监听端口
server.bind(new InetSocketAddress(8080));
//创建数组用来存放客户端channel
List<SocketChannel> channels = new ArrayList<SocketChannel>();
server.configureBlocking(false);//ServerSocketChannel改为非阻塞模式
while(true) {
System.out.println("accept -------before");
SocketChannel sc = server.accept(); //阻塞方法
System.out.println("accept -------after");
channels.add(sc);
if(sc != null) {
sc.configureBlocking(false);//SocketChannel非阻塞模式
}
//读取channel里面的数据
for(SocketChannel s :channels) {
ByteBuffer buffer = ByteBuffer.allocate(16);
System.out.println("read -------before");
s.read(buffer);//阻塞方法
System.out.println("read -------after");
buffer.flip();
String str = Charset.defaultCharset().decode(buffer).toString();
System.out.println(str+s.getLocalAddress());
buffer.clear();
}
}
}
}
client
package feizuse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
public class client1 {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress(8080));
while(true) {
Scanner scanner =new Scanner(System.in);
String str = scanner.next();
sc.write(Charset.defaultCharset().encode(str));
}
}
}
selector处理
package feizuse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class server2 {
public static void main(String[] args){
try {
//创建一个Selector
Selector selector =Selector.open();
//创建服务器
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
//绑定监听端口
server.bind(new InetSocketAddress(8080));
//建立Selector与ServerSocketChannel的连接
//SelectionKey可以知道是哪个客户端发生的事件
/**
* accept --连接时触发
* connect --连接建立后触发
* read --可读事件
* write --可写事件
*/
SelectionKey sskey = server.register(selector, 0);
//只关注连接事件
sskey.interestOps(SelectionKey.OP_ACCEPT);
while(true) {
selector.select();//阻塞方法,没有事件就阻塞
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取所有事件的合集
//如果是连接事件
while(iter.hasNext()) {
SelectionKey key =iter.next();
iter.remove();
if(key.isAcceptable()) {
ServerSocketChannel channel =(ServerSocketChannel) key.channel();
SocketChannel schannel = channel.accept();
schannel.configureBlocking(false);
SelectionKey skey = schannel.register(selector, 0, null);
skey.interestOps(SelectionKey.OP_READ);
}else if(key.isReadable()) {
try {
/**
* 客户端断开连接会添加一个read事件
*/
SocketChannel channel=(SocketChannel) key.channel();
ByteBuffer buffer =ByteBuffer.allocate(16);
int iscancel =channel.read(buffer);//正常情况下返回传输的字节,正常推出返回-1
if(iscancel==-1) {
key.cancel();
System.out.println("正常推出");
}
}catch(IOException e) {
e.printStackTrace();
//异常退出
key.cancel();
}
}
}
}
}catch(IOException e) {
e.printStackTrace();
}
}
}
多线程优化
package feizuse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class boss {
public static void main(String[] args) {
//当前是boos线程
Thread.currentThread().setName("boos");
try {
//建立服务端
ServerSocketChannel ssc = ServerSocketChannel.open();
//绑定接口
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);//开启非阻塞模式
Selector selector =Selector.open();
SelectionKey sskey = ssc.register(selector,SelectionKey.OP_ACCEPT);//注册selector并关注连接事件
//创建指定数量的worker
worker w1 = new worker("w1");
while(true) {
selector.select();//阻塞方法等待事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取所有事件的合集
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()) {
//如果是连接事件
ServerSocketChannel channel =(ServerSocketChannel) key.channel();
SocketChannel schannel = channel.accept();
//给该channel注册key
schannel.configureBlocking(false);
//注册w1的selector并关注读事件
w1.regesit(schannel);
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package feizuse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class worker implements Runnable{
//处理读事件
private Thread t1;//当前线程
private Selector selector;//selector
private ConcurrentLinkedQueue<Runnable> queue =new ConcurrentLinkedQueue<Runnable>();
worker(String name){
this.t1 =new Thread(this,name);
try {
this.selector =Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
t1.start();
}
void regesit(SocketChannel s) {
System.out.println(Thread.currentThread().getName()+"注册");
queue.add(()->{
try {
s.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
selector.wakeup();
}
@Override
public void run() {
while(true) {
try {
System.out.println(Thread.currentThread().getName()+"run");
System.out.println("阻塞");
selector.select();
Runnable task = queue.poll();
if(task!=null) {
System.out.println(Thread.currentThread().getName()+"任务");
System.out.println("执行任务");
task.run();
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key =iter.next();
iter.remove();
if(key.isReadable()) {
System.out.println("有读任务");
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(16);
int isclose = channel.read(buffer);
if(isclose ==-1) {
key.cancel();
}
System.out.println(buffer.position());
}catch(IOException e) {
key.cancel();
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Netty入门
Netty是什么
netty是异步的,事件驱动的网络编程框架
服务器
package com.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class server01 {
public static void main(String[] args) {
//服务器启动器
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder());
}
}).bind(8080);
}
}
客户端
package com.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
public class client01 {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder());
}
}).connect(new InetSocketAddress(8080))//连接服务器
.sync()
.channel()
.writeAndFlush("nihao");
}
}
组件
EventLoop
#EventLoop是处理事件的类,他包含selector和thread
package com.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
public class server01 {
public static void main(String[] args) {
//服务器启动器,用来装配组件
//创建一个独立的eventgroup
EventLoopGroup group = new DefaultEventLoopGroup(1);
new ServerBootstrap()
//装配eventloop组,
.group(new NioEventLoopGroup(1),new NioEventLoopGroup(2))
//channel具体实现
.channel(NioServerSocketChannel.class)
//eventloop处理逻辑
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
System.out.println(Thread.currentThread().getName());
//多个hander需要将消息向下传递
ctx.fireChannelRead(msg);
}
}).addLast(group,new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName());
System.out.println(buf.toString(Charset.defaultCharset())+"2");
}
});
}
})
.bind(8080);
}
}
channel
//关闭连接
channel.close();
//写入内存但不立即刷出
channel.write();
//将内存的数据刷出
channel.flush();
//写入内存并立即刷出
channel.writeAndFlush("1231");
//用来处理异步处理的结果的类
ChannelFuture
//connect是异步非阻塞方法,调用该方法的线程会继续向下执行代码,连接过程交给另外一个线程处理
如果不调用sync,连接的线程并没有执行完毕,也就是不存在channel,会出错,主线程需要等待连接建立成功后才能向下运行
.connect(new InetSocketAddress("localhost", 8080));
//连接服务器
处理的两种方式
1:sync()是让主线程阻塞,等待nio线程执行完建立连接的过程返回连接对象ChannelFuture后在向下执行
//sync()是main线程处理,等待结果返回后自己向下继续处理
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush();
2:另外一个处理方式,main调用connect后自己不等待结果,让nio线程自己处理
nio建立连接之后,会继续调用operationComplete方法,并执行里面的代码
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush();
}
});
处理断开连接的善后操作
close是异步操作,所以和connect一样,我们有两种处理办法。
channel.close();//异步操作
//同步处理办法
ChannelFuture close = channel.closeFuture();
close.sync();//阻塞当前线程,等待连接关闭后主线程进行善后处理
//异步处理办法,关闭连接的线程关闭连接后执行该方法
close.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("善后处理");
}
});
- 关闭eventloop
group.shutdownGracefully();//停止eventloop
Future
- jdk future ```java package com.netty;
import java.util.concurrent.*;
public class future01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* jdk版本的future
*/
//创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> future = service.submit(new Callable<Integer>() {
//提交任务
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 70;
}
});
//主线程接收结果
System.out.println(future.get());
}
}
- netty future
```java
/**
* netty 版本的future
*/
NioEventLoopGroup group = new NioEventLoopGroup(2);
EventLoop next = group.next();
Future<Integer> future = next.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 70;
}
});
//netty有同步和异步两种
System.out.println(future.get());
//第二种异步
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(future.getNow());
}
});
- promise ```java package com.netty;
import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultPromise;
import java.util.concurrent.ExecutionException;
public class promise { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group =new NioEventLoopGroup(2); EventLoop next = group.next(); DefaultPromise