一、netty的ByteBuf
创建ByteBuf
import io.netty.buffer.*;
//非池化堆内存分配
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(int initialCapacity, int maxCapacity);
//池化堆内存分配
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(int initialCapacity, int maxCapacity);
//默认分配(根据当前环境默认指定是池化还是非池化)
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(int initialCapacity, int maxCapacity);
使用ByteBuf
import io.netty.buffer.*;
@Test
public void myBytebuf(){
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(8, 12);
System.out.println("初始大小8字节,最大12字节");
print(buf);
System.out.println("-----填充4个字节后-----------");
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
System.out.println("-----填充4个字节后-----------");
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
System.out.println("-----填充4个字节后-----------");
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
System.out.println("-----填充4个字节后-----------");
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
}
初始大小8字节,最大12字节buf.isReadable() :false buf.readerIndex() :0 buf.readableBytes() 0 buf.isWritable() :true buf.writerIndex() :0 buf.writableBytes() :8 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 4 buf.isWritable() :true buf.writerIndex() :4 buf.writableBytes() :4 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 8 buf.isWritable() :false buf.writerIndex() :8 buf.writableBytes() :0 buf.capacity() :8 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————- buf.isReadable() :true buf.readerIndex() :0 buf.readableBytes() 12 buf.isWritable() :false buf.writerIndex() :12 buf.writableBytes() :0 buf.capacity() :12 buf.maxCapacity() :12 buf.isDirect() :true ——-填充4个字节后—————-
java.lang.IndexOutOfBoundsException: writerIndex(12) + minWritableBytes(4) exceeds maxCapacity(12): PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 12/12)
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:295)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:282)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1074)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1082)
at com.bjmashibing.system.io.netty.MyNetty.myBytebuf(MyNetty.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Process finished with exit code -1
二、netty之client 端
2.1 cleintModeV1
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
public class MyNetty {
@Test
public void cleintMode() throws InterruptedException {
//创建时间监听器
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
//创建客户端
NioSocketChannel nioSocketChannel = new NioSocketChannel();
nioEventLoopGroup.register(nioSocketChannel);
//连接服务端
nioSocketChannel.connect(new InetSocketAddress("192.168.235.133",9000));
//先向服务端发送“hello server”
ByteBuf hello = Unpooled.copiedBuffer("hello server".getBytes());
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(hello);
//注册处理器 监听服务端的响应
ChannelPipeline pipeline = nioSocketChannel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//msg -> ByteBuf
ByteBuf byteBuf = (ByteBuf)msg;
CharSequence data = byteBuf.getCharSequence(0, byteBuf.readableBytes(), CharsetUtil.UTF_8);
System.out.println("客户端读取到服务端"+ctx.pipeline().channel().remoteAddress().toString()+"的数据==>:"+data);
}
});
//设置客户端为同步
channelFuture.channel().closeFuture().sync();
//程序能执行到这里证明连接已经关闭了
System.out.println("连接关闭了");
}
}
打开服务端,使用命令开启一个临时监听端口
[root@192 ~]# nc -l 192.168.235.133 9000
hello server
hello client
^C
[root@192 ~]#
运行程序,获得结果:
客户端读取到服务端/192.168.235.133:9000的数据==>: 客户端读取到服务端/192.168.235.133:9000的数据==>:hello client
连接关闭了
2.2 nettyClientModeV2
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
public class MyNetty {
@Test
public void nettyClientMode() throws InterruptedException {
//创建时间监听器
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
//netty的一个引导工具
Bootstrap bootstrap = new Bootstrap();
//设置事件循环组
bootstrap.group(nioEventLoopGroup);
//设置通道为NioSocketChannel
bootstrap.channel(NioSocketChannel.class);
//设置事件处理器
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//msg -> ByteBuf
ByteBuf byteBuf = (ByteBuf)msg;
CharSequence data = byteBuf.getCharSequence(0, byteBuf.readableBytes(), CharsetUtil.UTF_8);
System.out.println("客户端读取到服务端"+ctx.pipeline().channel().remoteAddress().toString()+"的数据==>:"+data);
}
});
}
});
//设置要链接的服务端地址+端口
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("192.168.235.133", 9000));
//先向服务端发送“hello server”
ByteBuf hello = Unpooled.copiedBuffer("hello server".getBytes());
channelFuture.channel().writeAndFlush(hello);
//设置程序同步
channelFuture.channel().closeFuture().sync();
//程序能执行到这里证明连接已经关闭了
System.out.println("连接关闭了");
}
}
三、netty之server端
3.1 serverModeV1
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
public class MyNetty {
@Test
public void serverModeV1() throws InterruptedException {
//创建事件监听器
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
//NIO版的socket
NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
//注册到事件循环组中
eventLoopGroup.register(nioServerSocketChannel);
nioServerSocketChannel.pipeline().addLast(new MyAcceptHander().setEventLoopGroup(eventLoopGroup).setChannelHandler(new ChannelInit()));
//设置监听端口
ChannelFuture bind = nioServerSocketChannel.bind(new InetSocketAddress(9000));
bind.sync().channel().closeFuture().sync();
System.out.println("server close....");
}
/**
* 自定义处理接收状态变化的处理器
*/
private class MyAcceptHander extends ChannelInboundHandlerAdapter{
private EventLoopGroup eventLoopGroup;
private ChannelHandler channelHandler;
public MyAcceptHander setEventLoopGroup(EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
return this;
}
public MyAcceptHander setChannelHandler(ChannelHandler channelHandler) {
this.channelHandler = channelHandler;
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("触发了MyAcceptHander.channelRegistered()");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("触发了MyAcceptHander.channelRead()");
SocketChannel client = (SocketChannel)msg;
ChannelPipeline p = client.pipeline();
p.addLast(this.channelHandler);
// this.eventLoopGroup.register(client);
}
}
/**
* 为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例
*/
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Channel client = ctx.channel();
ChannelPipeline p = client.pipeline();
p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]
ctx.pipeline().remove(this);
}
}
/**
* 就是用户自己实现的,你能说让用户放弃属性的操作吗
* @ChannelHandler.Sharable 不应该被强压给coder
*/
class MyInHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("client registed...");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client active...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);
System.out.println(str);
ctx.writeAndFlush(buf);
}
}
}
3.2 nettyServerModeV2
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
public class MyNetty {
@Test
public void nettyServerModeV2() throws InterruptedException {
//创建事件监听器
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup,eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyInHandler());
}
});
ChannelFuture bind = serverBootstrap.bind(9000);
bind.channel().closeFuture().sync();
System.out.println("server close....");
}
/**
* 自定义处理接收状态变化的处理器
*/
private class MyAcceptHander extends ChannelInboundHandlerAdapter{
private EventLoopGroup eventLoopGroup;
private ChannelHandler channelHandler;
public MyAcceptHander setEventLoopGroup(EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
return this;
}
public MyAcceptHander setChannelHandler(ChannelHandler channelHandler) {
this.channelHandler = channelHandler;
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("触发了MyAcceptHander.channelRegistered()");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("触发了MyAcceptHander.channelRead()");
SocketChannel client = (SocketChannel)msg;
ChannelPipeline p = client.pipeline();
p.addLast(this.channelHandler);
// this.eventLoopGroup.register(client);
}
}
/**
* 为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例
*/
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Channel client = ctx.channel();
ChannelPipeline p = client.pipeline();
p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]
ctx.pipeline().remove(this);
}
}
/**
* 就是用户自己实现的,你能说让用户放弃属性的操作吗
* @ChannelHandler.Sharable 不应该被强压给coder
*/
class MyInHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("client registed...");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client active...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);
System.out.println(str);
ctx.writeAndFlush(buf);
}
}
}