一、netty的ByteBuf
创建ByteBuf
import io.netty.buffer.*;//非池化堆内存分配ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(int initialCapacity, int maxCapacity);//池化堆内存分配ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(int initialCapacity, int maxCapacity);//默认分配(根据当前环境默认指定是池化还是非池化)ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(int initialCapacity, int maxCapacity);
使用ByteBuf
import io.netty.buffer.*;@Testpublic void myBytebuf(){ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(8, 12);System.out.println("初始大小8字节,最大12字节");print(buf);System.out.println("-----填充4个字节后-----------");buf.writeBytes(new byte[]{1,2,3,4});print(buf);System.out.println("-----填充4个字节后-----------");buf.writeBytes(new byte[]{1,2,3,4});print(buf);System.out.println("-----填充4个字节后-----------");buf.writeBytes(new byte[]{1,2,3,4});print(buf);System.out.println("-----填充4个字节后-----------");buf.writeBytes(new byte[]{1,2,3,4});print(buf);}
初始大小8字节,最大12字节buf.isReadable() :false buf.readerIndex() :0 buf.readableBytes() 0 buf.isWritable() :true buf.writerIndex() :0 buf.writableBytes() :8 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 4 buf.isWritable() :true buf.writerIndex() :4 buf.writableBytes() :4 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 8 buf.isWritable() :false buf.writerIndex() :8 buf.writableBytes() :0 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 12 buf.isWritable() :false buf.writerIndex() :12 buf.writableBytes() :0 buf.capacity() :12 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————-
java.lang.IndexOutOfBoundsException: writerIndex(12) + minWritableBytes(4) exceeds maxCapacity(12): PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 12/12)
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:295)at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:282)at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1074)at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1082)at com.bjmashibing.system.io.netty.MyNetty.myBytebuf(MyNetty.java:65)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)at org.junit.runners.ParentRunner.run(ParentRunner.java:309)at org.junit.runner.JUnitCore.run(JUnitCore.java:160)at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)Process finished with exit code -1
二、netty之client 端
2.1 cleintModeV1
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.CharsetUtil;import org.junit.Test;import java.net.InetSocketAddress;public class MyNetty {@Testpublic void cleintMode() throws InterruptedException {//创建时间监听器NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);//创建客户端NioSocketChannel nioSocketChannel = new NioSocketChannel();nioEventLoopGroup.register(nioSocketChannel);//连接服务端nioSocketChannel.connect(new InetSocketAddress("192.168.235.133",9000));//先向服务端发送“hello server”ByteBuf hello = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(hello);//注册处理器 监听服务端的响应ChannelPipeline pipeline = nioSocketChannel.pipeline();pipeline.addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//msg -> ByteBufByteBuf byteBuf = (ByteBuf)msg;CharSequence data = byteBuf.getCharSequence(0, byteBuf.readableBytes(), CharsetUtil.UTF_8);System.out.println("客户端读取到服务端"+ctx.pipeline().channel().remoteAddress().toString()+"的数据==>:"+data);}});//设置客户端为同步channelFuture.channel().closeFuture().sync();//程序能执行到这里证明连接已经关闭了System.out.println("连接关闭了");}}
打开服务端,使用命令开启一个临时监听端口
[root@192 ~]# nc -l 192.168.235.133 9000hello serverhello client^C[root@192 ~]#
运行程序,获得结果:
客户端读取到服务端/192.168.235.133:9000的数据==>: 客户端读取到服务端/192.168.235.133:9000的数据==>:hello client
连接关闭了
2.2 nettyClientModeV2
import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.CharsetUtil;import org.junit.Test;import java.net.InetSocketAddress;public class MyNetty {@Testpublic void nettyClientMode() throws InterruptedException {//创建时间监听器NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);//netty的一个引导工具Bootstrap bootstrap = new Bootstrap();//设置事件循环组bootstrap.group(nioEventLoopGroup);//设置通道为NioSocketChannelbootstrap.channel(NioSocketChannel.class);//设置事件处理器bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//msg -> ByteBufByteBuf byteBuf = (ByteBuf)msg;CharSequence data = byteBuf.getCharSequence(0, byteBuf.readableBytes(), CharsetUtil.UTF_8);System.out.println("客户端读取到服务端"+ctx.pipeline().channel().remoteAddress().toString()+"的数据==>:"+data);}});}});//设置要链接的服务端地址+端口ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("192.168.235.133", 9000));//先向服务端发送“hello server”ByteBuf hello = Unpooled.copiedBuffer("hello server".getBytes());channelFuture.channel().writeAndFlush(hello);//设置程序同步channelFuture.channel().closeFuture().sync();//程序能执行到这里证明连接已经关闭了System.out.println("连接关闭了");}}
三、netty之server端
3.1 serverModeV1
import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.CharsetUtil;import org.junit.Test;import java.net.InetSocketAddress;public class MyNetty {@Testpublic void serverModeV1() throws InterruptedException {//创建事件监听器NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);//NIO版的socketNioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();//注册到事件循环组中eventLoopGroup.register(nioServerSocketChannel);nioServerSocketChannel.pipeline().addLast(new MyAcceptHander().setEventLoopGroup(eventLoopGroup).setChannelHandler(new ChannelInit()));//设置监听端口ChannelFuture bind = nioServerSocketChannel.bind(new InetSocketAddress(9000));bind.sync().channel().closeFuture().sync();System.out.println("server close....");}/*** 自定义处理接收状态变化的处理器*/private class MyAcceptHander extends ChannelInboundHandlerAdapter{private EventLoopGroup eventLoopGroup;private ChannelHandler channelHandler;public MyAcceptHander setEventLoopGroup(EventLoopGroup eventLoopGroup) {this.eventLoopGroup = eventLoopGroup;return this;}public MyAcceptHander setChannelHandler(ChannelHandler channelHandler) {this.channelHandler = channelHandler;return this;}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("触发了MyAcceptHander.channelRegistered()");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("触发了MyAcceptHander.channelRead()");SocketChannel client = (SocketChannel)msg;ChannelPipeline p = client.pipeline();p.addLast(this.channelHandler);// this.eventLoopGroup.register(client);}}/*** 为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例*/@ChannelHandler.Sharableclass ChannelInit extends ChannelInboundHandlerAdapter{@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]ctx.pipeline().remove(this);}}/*** 就是用户自己实现的,你能说让用户放弃属性的操作吗* @ChannelHandler.Sharable 不应该被强压给coder*/class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registed...");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client active...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(str);ctx.writeAndFlush(buf);}}}
3.2 nettyServerModeV2
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.ServerSocketChannel;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.CharsetUtil;import org.junit.Test;import java.net.InetSocketAddress;public class MyNetty {@Testpublic void nettyServerModeV2() throws InterruptedException {//创建事件监听器NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(eventLoopGroup,eventLoopGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new MyInHandler());}});ChannelFuture bind = serverBootstrap.bind(9000);bind.channel().closeFuture().sync();System.out.println("server close....");}/*** 自定义处理接收状态变化的处理器*/private class MyAcceptHander extends ChannelInboundHandlerAdapter{private EventLoopGroup eventLoopGroup;private ChannelHandler channelHandler;public MyAcceptHander setEventLoopGroup(EventLoopGroup eventLoopGroup) {this.eventLoopGroup = eventLoopGroup;return this;}public MyAcceptHander setChannelHandler(ChannelHandler channelHandler) {this.channelHandler = channelHandler;return this;}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("触发了MyAcceptHander.channelRegistered()");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("触发了MyAcceptHander.channelRead()");SocketChannel client = (SocketChannel)msg;ChannelPipeline p = client.pipeline();p.addLast(this.channelHandler);// this.eventLoopGroup.register(client);}}/*** 为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例*/@ChannelHandler.Sharableclass ChannelInit extends ChannelInboundHandlerAdapter{@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]ctx.pipeline().remove(this);}}/*** 就是用户自己实现的,你能说让用户放弃属性的操作吗* @ChannelHandler.Sharable 不应该被强压给coder*/class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registed...");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client active...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(str);ctx.writeAndFlush(buf);}}}
