Echo服务器
所有的Netty服务器都需要以下两部分。
- 至少一个
ChannelHandler
——该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。 - 引导 ——这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。
Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler
接口,用来定义响应入站 事件的方法。继承ChannelInboundHandlerAdapter
类,它提供了ChannelInboundHandler
的默认实现。
EchoServerHandler
channelRead()
——对于每个传入的消息都要调用;channelReadComplete()
——通知ChannelInboundHandler
最后一次对channel-Read()
的调用是当前批量读取中的最后一条消息;exceptionCaught()
——在读取操作期间,有异常抛出时会调用。
ChannelInboundHandlerAdapter
的每个方法都可以被重写。
除了ChannelInboundHandlerAdapter
之外,还有很多需要学习的ChannelHandler
的子类型和实现.
- 针对不同类型的事件来调用
ChannelHandler
;
package com.github.twx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* ChannelInBoundHandler: 响应入站事件
* ChannelInboundHandlerAdapter提供了ChannelInboundHandler 的默认实现
* @author tangwx@soyuan.com.cn
* @date 2019-05-22 23:51
*/
//标示ChannelHandler可以被多个Channel安全地共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 对于每个传入的消息都要调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("server received: "+byteBuf.toString(CharsetUtil.UTF_8));
//原封不动的发送给客户端
ctx.write(byteBuf);
}
/**
* 对channelRead()的调用是当前批量读取中的最后一条消息
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将未处理的消息冲刷到远程节点,并且关闭该Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/**
* 每个Channel都拥有一个关联的ChannelPipeline,其持有一个ChannelHandler的实例链。
* ChannelHandler会把方法的调用转发给链中的下一个Channel-Handler 。
*
* 如果exceptionCaught() 方法没有被实现,
* 那么异常将会被传递到ChannelPipeline的尾端并被记录
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
引导服务器
引导服务器可以概况成两个步骤:
- 绑定到服务器将在其上监听并接受传入连接请求的端口;
- 配置
Channel
,以将有关的入站消息通知给EchoServerHandler
实例。
EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例
package com.github.twx;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* @author tangwx@soyuan.com.cn
* @date 2019-05-22 23:58
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) {
//设置端口值
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
private void start() {
//这是我们自己实现的ChannelHandler
EchoServerHandler serverHandler = new EchoServerHandler();
//创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
//创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
//指定所使用的NIO传输Channel
//使用NIO是因为它的可扩展性和彻底的异步性,它是目前使用最广泛的传输
//如果你想要在自己的服务器中使用OIO传输,
//将需要指定OioServerSocketChannel和OioEventLoopGroup 。
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//添加一个EchoServerHandler到子Channel的ChannelPipeline
//当有一个新的连接进入时,新的子Channel将会被创建
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例
ch.pipeline().addLast(serverHandler);
}
});
try {
//异步绑定服务器:调用sync()方法阻塞等待直到绑定完成
ChannelFuture future = bootstrap.bind().sync();
//获取Channel的CloseFuture,并且阻塞当前线程直到它完成
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
try {
//关闭EventLoopGroup,释放所有的资源
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
注意看代码解释。
Echo客户端
扩展SimpleChannelInboundHandler
类处理入站消息。
EchoClientHandler
package com.github.twx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author tangwx@soyuan.com.cn
* @date 2019-05-23 00:18
*/
//标记Sharable的实例可以被多个Channel共享
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 在与服务器建立连接之后将被调用
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//建立连接后给服务器发送信息
ctx.writeAndFlush(Unpooled.copiedBuffer("我是客户端,我已与服务器连接...", CharsetUtil.UTF_8));
}
/**
* 当从服务器接收到一条消息时被调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("【客户端从服务器收到消息:】 "+msg.toString(CharsetUtil.UTF_8));
}
/**
* 在处理过程中引发异常时被调用。
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
重写了channelRead0()
方法。每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了5字节,那么不能保证这5字节会被一次性接收。即使是对于这么少量的数据,channelRead0()
方法也可能会被调用两次.
引导客户端
客户端是使用主机和端口参数来连接远程地址.
与服务器引导还是非常类似的,不同的点我已经在代码注释中标明了。
package com.github.twx;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @author tangwx@soyuan.com.cn
* @date 2019-05-23 00:12
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) {
new EchoClient(args[0], Integer.parseInt(args[1])).start();
}
private void start() {
//指定EventLoopGroup以处理客户端事件;需要适用于NIO的实现
EventLoopGroup group = new NioEventLoopGroup();
//创建Bootstrap
//服务端是:ServerBootstrap bootstrap = new ServerBootstrap();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
//适用于NIO传输的Channel类型
.channel(NioSocketChannel.class)
//设置服务器的InetSocketAddr-ess
//服务端是localAddress
.remoteAddress(new InetSocketAddress(host,port))
//在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//特定的处理器
ch.pipeline().addLast(new EchoClientHandler());
}
});
try {
//连接到远程节点,阻塞等待直到连接完成
//服务器是bind()
ChannelFuture future = bootstrap.connect().sync();
//阻塞,直到Channel关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
try {
//关闭线程池并且释放所有的资源
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}