一、netty的ByteBuf

创建ByteBuf

  1. import io.netty.buffer.*;
  2. //非池化堆内存分配
  3. ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(int initialCapacity, int maxCapacity);
  4. //池化堆内存分配
  5. ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(int initialCapacity, int maxCapacity);
  6. //默认分配(根据当前环境默认指定是池化还是非池化)
  7. ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(int initialCapacity, int maxCapacity);

使用ByteBuf

  1. import io.netty.buffer.*;
  2. @Test
  3. public void myBytebuf(){
  4. ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(8, 12);
  5. System.out.println("初始大小8字节,最大12字节");
  6. print(buf);
  7. System.out.println("-----填充4个字节后-----------");
  8. buf.writeBytes(new byte[]{1,2,3,4});
  9. print(buf);
  10. System.out.println("-----填充4个字节后-----------");
  11. buf.writeBytes(new byte[]{1,2,3,4});
  12. print(buf);
  13. System.out.println("-----填充4个字节后-----------");
  14. buf.writeBytes(new byte[]{1,2,3,4});
  15. print(buf);
  16. System.out.println("-----填充4个字节后-----------");
  17. buf.writeBytes(new byte[]{1,2,3,4});
  18. print(buf);
  19. }

初始大小8字节,最大12字节buf.isReadable() :false buf.readerIndex() :0 buf.readableBytes() 0 buf.isWritable() :true buf.writerIndex() :0 buf.writableBytes() :8 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 4 buf.isWritable() :true buf.writerIndex() :4 buf.writableBytes() :4 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 8 buf.isWritable() :false buf.writerIndex() :8 buf.writableBytes() :0 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 12 buf.isWritable() :false buf.writerIndex() :12 buf.writableBytes() :0 buf.capacity() :12 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————-

java.lang.IndexOutOfBoundsException: writerIndex(12) + minWritableBytes(4) exceeds maxCapacity(12): PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 12/12)

  1. at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:295)
  2. at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:282)
  3. at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1074)
  4. at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1082)
  5. at com.bjmashibing.system.io.netty.MyNetty.myBytebuf(MyNetty.java:65)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  7. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  8. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  9. at java.lang.reflect.Method.invoke(Method.java:498)
  10. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
  11. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  12. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
  13. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  14. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
  15. at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
  16. at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
  17. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
  18. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
  19. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
  20. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
  21. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
  22. at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
  23. at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
  24. at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
  25. at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
  26. at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
  27. at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

Process finished with exit code -1

二、netty之client 端

2.1 cleintModeV1

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.channel.ChannelPipeline;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.util.CharsetUtil;
  10. import org.junit.Test;
  11. import java.net.InetSocketAddress;
  12. public class MyNetty {
  13. @Test
  14. public void cleintMode() throws InterruptedException {
  15. //创建时间监听器
  16. NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
  17. //创建客户端
  18. NioSocketChannel nioSocketChannel = new NioSocketChannel();
  19. nioEventLoopGroup.register(nioSocketChannel);
  20. //连接服务端
  21. nioSocketChannel.connect(new InetSocketAddress("192.168.235.133",9000));
  22. //先向服务端发送“hello server”
  23. ByteBuf hello = Unpooled.copiedBuffer("hello server".getBytes());
  24. ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(hello);
  25. //注册处理器 监听服务端的响应
  26. ChannelPipeline pipeline = nioSocketChannel.pipeline();
  27. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  28. @Override
  29. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  30. //msg -> ByteBuf
  31. ByteBuf byteBuf = (ByteBuf)msg;
  32. CharSequence data = byteBuf.getCharSequence(0, byteBuf.readableBytes(), CharsetUtil.UTF_8);
  33. System.out.println("客户端读取到服务端"+ctx.pipeline().channel().remoteAddress().toString()+"的数据==>:"+data);
  34. }
  35. });
  36. //设置客户端为同步
  37. channelFuture.channel().closeFuture().sync();
  38. //程序能执行到这里证明连接已经关闭了
  39. System.out.println("连接关闭了");
  40. }
  41. }

打开服务端,使用命令开启一个临时监听端口

  1. [root@192 ~]# nc -l 192.168.235.133 9000
  2. hello server
  3. hello client
  4. ^C
  5. [root@192 ~]#

运行程序,获得结果:

客户端读取到服务端/192.168.235.133:9000的数据==>: 客户端读取到服务端/192.168.235.133:9000的数据==>:hello client

连接关闭了

2.2 nettyClientModeV2

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.util.CharsetUtil;
  8. import org.junit.Test;
  9. import java.net.InetSocketAddress;
  10. public class MyNetty {
  11. @Test
  12. public void nettyClientMode() throws InterruptedException {
  13. //创建时间监听器
  14. NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
  15. //netty的一个引导工具
  16. Bootstrap bootstrap = new Bootstrap();
  17. //设置事件循环组
  18. bootstrap.group(nioEventLoopGroup);
  19. //设置通道为NioSocketChannel
  20. bootstrap.channel(NioSocketChannel.class);
  21. //设置事件处理器
  22. bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
  23. @Override
  24. protected void initChannel(NioSocketChannel ch) throws Exception {
  25. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  26. @Override
  27. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  28. //msg -> ByteBuf
  29. ByteBuf byteBuf = (ByteBuf)msg;
  30. CharSequence data = byteBuf.getCharSequence(0, byteBuf.readableBytes(), CharsetUtil.UTF_8);
  31. System.out.println("客户端读取到服务端"+ctx.pipeline().channel().remoteAddress().toString()+"的数据==>:"+data);
  32. }
  33. });
  34. }
  35. });
  36. //设置要链接的服务端地址+端口
  37. ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("192.168.235.133", 9000));
  38. //先向服务端发送“hello server”
  39. ByteBuf hello = Unpooled.copiedBuffer("hello server".getBytes());
  40. channelFuture.channel().writeAndFlush(hello);
  41. //设置程序同步
  42. channelFuture.channel().closeFuture().sync();
  43. //程序能执行到这里证明连接已经关闭了
  44. System.out.println("连接关闭了");
  45. }
  46. }

三、netty之server端

3.1 serverModeV1

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.util.CharsetUtil;
  10. import org.junit.Test;
  11. import java.net.InetSocketAddress;
  12. public class MyNetty {
  13. @Test
  14. public void serverModeV1() throws InterruptedException {
  15. //创建事件监听器
  16. NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
  17. //NIO版的socket
  18. NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
  19. //注册到事件循环组中
  20. eventLoopGroup.register(nioServerSocketChannel);
  21. nioServerSocketChannel.pipeline().addLast(new MyAcceptHander().setEventLoopGroup(eventLoopGroup).setChannelHandler(new ChannelInit()));
  22. //设置监听端口
  23. ChannelFuture bind = nioServerSocketChannel.bind(new InetSocketAddress(9000));
  24. bind.sync().channel().closeFuture().sync();
  25. System.out.println("server close....");
  26. }
  27. /**
  28. * 自定义处理接收状态变化的处理器
  29. */
  30. private class MyAcceptHander extends ChannelInboundHandlerAdapter{
  31. private EventLoopGroup eventLoopGroup;
  32. private ChannelHandler channelHandler;
  33. public MyAcceptHander setEventLoopGroup(EventLoopGroup eventLoopGroup) {
  34. this.eventLoopGroup = eventLoopGroup;
  35. return this;
  36. }
  37. public MyAcceptHander setChannelHandler(ChannelHandler channelHandler) {
  38. this.channelHandler = channelHandler;
  39. return this;
  40. }
  41. @Override
  42. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  43. System.out.println("触发了MyAcceptHander.channelRegistered()");
  44. }
  45. @Override
  46. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  47. System.out.println("触发了MyAcceptHander.channelRead()");
  48. SocketChannel client = (SocketChannel)msg;
  49. ChannelPipeline p = client.pipeline();
  50. p.addLast(this.channelHandler);
  51. // this.eventLoopGroup.register(client);
  52. }
  53. }
  54. /**
  55. * 为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例
  56. */
  57. @ChannelHandler.Sharable
  58. class ChannelInit extends ChannelInboundHandlerAdapter{
  59. @Override
  60. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  61. Channel client = ctx.channel();
  62. ChannelPipeline p = client.pipeline();
  63. p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]
  64. ctx.pipeline().remove(this);
  65. }
  66. }
  67. /**
  68. * 就是用户自己实现的,你能说让用户放弃属性的操作吗
  69. * @ChannelHandler.Sharable 不应该被强压给coder
  70. */
  71. class MyInHandler extends ChannelInboundHandlerAdapter {
  72. @Override
  73. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  74. System.out.println("client registed...");
  75. }
  76. @Override
  77. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  78. System.out.println("client active...");
  79. }
  80. @Override
  81. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  82. ByteBuf buf = (ByteBuf) msg;
  83. // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
  84. CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);
  85. System.out.println(str);
  86. ctx.writeAndFlush(buf);
  87. }
  88. }
  89. }

3.2 nettyServerModeV2

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.ServerSocketChannel;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import io.netty.channel.socket.nio.NioSocketChannel;
  11. import io.netty.util.CharsetUtil;
  12. import org.junit.Test;
  13. import java.net.InetSocketAddress;
  14. public class MyNetty {
  15. @Test
  16. public void nettyServerModeV2() throws InterruptedException {
  17. //创建事件监听器
  18. NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
  19. ServerBootstrap serverBootstrap = new ServerBootstrap();
  20. serverBootstrap.group(eventLoopGroup,eventLoopGroup);
  21. serverBootstrap.channel(NioServerSocketChannel.class);
  22. serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
  23. @Override
  24. protected void initChannel(NioSocketChannel ch) throws Exception {
  25. ch.pipeline().addLast(new MyInHandler());
  26. }
  27. });
  28. ChannelFuture bind = serverBootstrap.bind(9000);
  29. bind.channel().closeFuture().sync();
  30. System.out.println("server close....");
  31. }
  32. /**
  33. * 自定义处理接收状态变化的处理器
  34. */
  35. private class MyAcceptHander extends ChannelInboundHandlerAdapter{
  36. private EventLoopGroup eventLoopGroup;
  37. private ChannelHandler channelHandler;
  38. public MyAcceptHander setEventLoopGroup(EventLoopGroup eventLoopGroup) {
  39. this.eventLoopGroup = eventLoopGroup;
  40. return this;
  41. }
  42. public MyAcceptHander setChannelHandler(ChannelHandler channelHandler) {
  43. this.channelHandler = channelHandler;
  44. return this;
  45. }
  46. @Override
  47. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  48. System.out.println("触发了MyAcceptHander.channelRegistered()");
  49. }
  50. @Override
  51. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  52. System.out.println("触发了MyAcceptHander.channelRead()");
  53. SocketChannel client = (SocketChannel)msg;
  54. ChannelPipeline p = client.pipeline();
  55. p.addLast(this.channelHandler);
  56. // this.eventLoopGroup.register(client);
  57. }
  58. }
  59. /**
  60. * 为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例
  61. */
  62. @ChannelHandler.Sharable
  63. class ChannelInit extends ChannelInboundHandlerAdapter{
  64. @Override
  65. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  66. Channel client = ctx.channel();
  67. ChannelPipeline p = client.pipeline();
  68. p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]
  69. ctx.pipeline().remove(this);
  70. }
  71. }
  72. /**
  73. * 就是用户自己实现的,你能说让用户放弃属性的操作吗
  74. * @ChannelHandler.Sharable 不应该被强压给coder
  75. */
  76. class MyInHandler extends ChannelInboundHandlerAdapter {
  77. @Override
  78. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  79. System.out.println("client registed...");
  80. }
  81. @Override
  82. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  83. System.out.println("client active...");
  84. }
  85. @Override
  86. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  87. ByteBuf buf = (ByteBuf) msg;
  88. // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
  89. CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);
  90. System.out.println(str);
  91. ctx.writeAndFlush(buf);
  92. }
  93. }
  94. }