优点:
并发高
传输快
封装好
netty的底层是nio,所以第一阶段我们先了解nio
学习前的准备,NIO学习
传统的bio 阻塞i/o模型,一个线程只能处理一个客户端,客户端没任务时线程阻塞,资源利用率较低,引入线程同样如此
nio 非阻塞i/o模型,引入三大组件
三大组件
channel
# 管道 数据输入输出的通道的通道# 常见的channel有FileChannelServerSocketChannelSocketChannel
//工作在阻塞模式下FileChannel channel = new FileInputStream("1.txt").getChannel();public static void main(String[] args) {try {FileChannel from =new FileInputStream("1.txt").getChannel();FileChannel to = new FileOutputStream("2.txt").getChannel();from.transferTo(0, from.size(), to);//一次只能传输2g的文件System.out.println("ok");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
buffer
# 缓冲区 用来协助数据输入与输出ByteBuffer数据结构private int position = 0; //buffer当前读写位置private int limit;//buffer的读写限制private int capacity;//buffer的大小



ByteBuffer buffer = ByteBuffer.allocate(12); //使用allocate创建并分配空间,分配堆内存,读写效率高ByteBuffer buffer = ByteBuffer.allocateDirect(12);//分配的是直接内存,读写效率低,且不受垃圾回收影响常用api:buffer.put();//向缓冲区写入buffer.get();//从缓冲区获取buffer.flip();//切换为读模式buffer.clear();//切换为写模式buffer.compact();//切换为写模式,并将没读取的字节向前移动buffer.remaining();//获得buffer中未读的字节数buffer.hasRemaining();//判断buffer是否全部读取buffer.rewind();//从头开始读buffer.mark();//记录position的位置buffer.reset();//跳转到mark标记的位置String str = Charset.defaultCharset().decode(buffer).toString();//buffer转换为Stringbuffer = StandardCharsets.UTF_8.encode(str);//字符串转换为buffer
selector
阻塞模式
serverpackage feizuse;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;public class server {public static void main(String[] args) throws IOException {//创建服务器ServerSocketChannel server = ServerSocketChannel.open();//绑定监听端口server.bind(new InetSocketAddress(8080));//创建数组用来存放客户端channelList<SocketChannel> channels = new ArrayList<SocketChannel>();server.configureBlocking(false);//ServerSocketChannel改为非阻塞模式while(true) {System.out.println("accept -------before");SocketChannel sc = server.accept(); //阻塞方法System.out.println("accept -------after");channels.add(sc);if(sc != null) {sc.configureBlocking(false);//SocketChannel非阻塞模式}//读取channel里面的数据for(SocketChannel s :channels) {ByteBuffer buffer = ByteBuffer.allocate(16);System.out.println("read -------before");s.read(buffer);//阻塞方法System.out.println("read -------after");buffer.flip();String str = Charset.defaultCharset().decode(buffer).toString();System.out.println(str+s.getLocalAddress());buffer.clear();}}}}clientpackage feizuse;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Scanner;public class client1 {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress(8080));while(true) {Scanner scanner =new Scanner(System.in);String str = scanner.next();sc.write(Charset.defaultCharset().encode(str));}}}
selector处理
package feizuse;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class server2 {public static void main(String[] args){try {//创建一个SelectorSelector selector =Selector.open();//创建服务器ServerSocketChannel server = ServerSocketChannel.open();server.configureBlocking(false);//绑定监听端口server.bind(new InetSocketAddress(8080));//建立Selector与ServerSocketChannel的连接//SelectionKey可以知道是哪个客户端发生的事件/*** accept --连接时触发* connect --连接建立后触发* read --可读事件* write --可写事件*/SelectionKey sskey = server.register(selector, 0);//只关注连接事件sskey.interestOps(SelectionKey.OP_ACCEPT);while(true) {selector.select();//阻塞方法,没有事件就阻塞Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取所有事件的合集//如果是连接事件while(iter.hasNext()) {SelectionKey key =iter.next();iter.remove();if(key.isAcceptable()) {ServerSocketChannel channel =(ServerSocketChannel) key.channel();SocketChannel schannel = channel.accept();schannel.configureBlocking(false);SelectionKey skey = schannel.register(selector, 0, null);skey.interestOps(SelectionKey.OP_READ);}else if(key.isReadable()) {try {/*** 客户端断开连接会添加一个read事件*/SocketChannel channel=(SocketChannel) key.channel();ByteBuffer buffer =ByteBuffer.allocate(16);int iscancel =channel.read(buffer);//正常情况下返回传输的字节,正常推出返回-1if(iscancel==-1) {key.cancel();System.out.println("正常推出");}}catch(IOException e) {e.printStackTrace();//异常退出key.cancel();}}}}}catch(IOException e) {e.printStackTrace();}}}
多线程优化
package feizuse;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class boss {public static void main(String[] args) {//当前是boos线程Thread.currentThread().setName("boos");try {//建立服务端ServerSocketChannel ssc = ServerSocketChannel.open();//绑定接口ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);//开启非阻塞模式Selector selector =Selector.open();SelectionKey sskey = ssc.register(selector,SelectionKey.OP_ACCEPT);//注册selector并关注连接事件//创建指定数量的workerworker w1 = new worker("w1");while(true) {selector.select();//阻塞方法等待事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取所有事件的合集while(iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()) {//如果是连接事件ServerSocketChannel channel =(ServerSocketChannel) key.channel();SocketChannel schannel = channel.accept();//给该channel注册keyschannel.configureBlocking(false);//注册w1的selector并关注读事件w1.regesit(schannel);}}}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}package feizuse;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import java.util.concurrent.ConcurrentLinkedQueue;public class worker implements Runnable{//处理读事件private Thread t1;//当前线程private Selector selector;//selectorprivate ConcurrentLinkedQueue<Runnable> queue =new ConcurrentLinkedQueue<Runnable>();worker(String name){this.t1 =new Thread(this,name);try {this.selector =Selector.open();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}t1.start();}void regesit(SocketChannel s) {System.out.println(Thread.currentThread().getName()+"注册");queue.add(()->{try {s.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {// TODO Auto-generated catch blocke.printStackTrace();}});selector.wakeup();}@Overridepublic void run() {while(true) {try {System.out.println(Thread.currentThread().getName()+"run");System.out.println("阻塞");selector.select();Runnable task = queue.poll();if(task!=null) {System.out.println(Thread.currentThread().getName()+"任务");System.out.println("执行任务");task.run();}Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()) {SelectionKey key =iter.next();iter.remove();if(key.isReadable()) {System.out.println("有读任务");try {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer=ByteBuffer.allocate(16);int isclose = channel.read(buffer);if(isclose ==-1) {key.cancel();}System.out.println(buffer.position());}catch(IOException e) {key.cancel();}}}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}
Netty入门
Netty是什么
netty是异步的,事件驱动的网络编程框架
服务器
package com.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;public class server01 {public static void main(String[] args) {//服务器启动器new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());}}).bind(8080);}}
客户端
package com.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import java.net.InetSocketAddress;public class client01 {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());}}).connect(new InetSocketAddress(8080))//连接服务器.sync().channel().writeAndFlush("nihao");}}
组件
EventLoop
#EventLoop是处理事件的类,他包含selector和threadpackage com.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import java.nio.charset.Charset;public class server01 {public static void main(String[] args) {//服务器启动器,用来装配组件//创建一个独立的eventgroupEventLoopGroup group = new DefaultEventLoopGroup(1);new ServerBootstrap()//装配eventloop组,.group(new NioEventLoopGroup(1),new NioEventLoopGroup(2))//channel具体实现.channel(NioServerSocketChannel.class)//eventloop处理逻辑.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));System.out.println(Thread.currentThread().getName());//多个hander需要将消息向下传递ctx.fireChannelRead(msg);}}).addLast(group,new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName());System.out.println(buf.toString(Charset.defaultCharset())+"2");}});}}).bind(8080);}}
channel
//关闭连接channel.close();//写入内存但不立即刷出channel.write();//将内存的数据刷出channel.flush();//写入内存并立即刷出channel.writeAndFlush("1231");//用来处理异步处理的结果的类ChannelFuture//connect是异步非阻塞方法,调用该方法的线程会继续向下执行代码,连接过程交给另外一个线程处理如果不调用sync,连接的线程并没有执行完毕,也就是不存在channel,会出错,主线程需要等待连接建立成功后才能向下运行.connect(new InetSocketAddress("localhost", 8080));//连接服务器处理的两种方式1:sync()是让主线程阻塞,等待nio线程执行完建立连接的过程返回连接对象ChannelFuture后在向下执行//sync()是main线程处理,等待结果返回后自己向下继续处理channelFuture.sync();Channel channel = channelFuture.channel();channel.writeAndFlush();2:另外一个处理方式,main调用connect后自己不等待结果,让nio线程自己处理nio建立连接之后,会继续调用operationComplete方法,并执行里面的代码channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();channel.writeAndFlush();}});处理断开连接的善后操作close是异步操作,所以和connect一样,我们有两种处理办法。channel.close();//异步操作//同步处理办法ChannelFuture close = channel.closeFuture();close.sync();//阻塞当前线程,等待连接关闭后主线程进行善后处理//异步处理办法,关闭连接的线程关闭连接后执行该方法close.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {System.out.println("善后处理");}});
- 关闭eventloop
group.shutdownGracefully();//停止eventloop
Future
- jdk future ```java package com.netty;
import java.util.concurrent.*;
public class future01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {/*** jdk版本的future*///创建线程池ExecutorService service = Executors.newFixedThreadPool(2);Future<Integer> future = service.submit(new Callable<Integer>() {//提交任务@Overridepublic Integer call() throws Exception {Thread.sleep(1000);return 70;}});//主线程接收结果System.out.println(future.get());}
}
- netty future```java/*** netty 版本的future*/NioEventLoopGroup group = new NioEventLoopGroup(2);EventLoop next = group.next();Future<Integer> future = next.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(1000);return 70;}});//netty有同步和异步两种System.out.println(future.get());//第二种异步future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {System.out.println(future.getNow());}});
- promise ```java package com.netty;
import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultPromise;
import java.util.concurrent.ExecutionException;
public class promise { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group =new NioEventLoopGroup(2); EventLoop next = group.next(); DefaultPromise
