Demo实例
编码步骤:
当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel,Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel,注册后返回一个 SelectionKey, 会和该Selector 关联(集合),进一步得到各个 SelectionKey (有事件发生)
在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
判断该Channel的事件类型,对不同事件进行不同的业务处理
NIO入门案例:实现服务器和客户端的简单通讯
@Testpublic void Server() throws IOException {//创建ServerSocketChannel -> ServerSocketServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//得到一个Selector对象Selector selector = Selector.open();//绑定一个端口6666serverSocketChannel.socket().bind(new InetSocketAddress(6666));//设置非阻塞serverSocketChannel.configureBlocking(false);//把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println();//循环等待客户端连接while (true) {//等待1秒,如果没有事件发生,就返回if (selector.select(1000) == 0) {System.out.println("服务器等待了1秒,无连接");continue;}//如果返回的 > 0,表示已经获取到关注的事件// 就获取到相关的 selectionKey 集合,反向获取通道Set<SelectionKey> selectionKeys = selector.selectedKeys();//遍历 Set<SelectionKey>,使用迭代器遍历Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()) {//获取到SelectionKeySelectionKey key = keyIterator.next();//根据 key 对应的通道发生的事件,做相应的处理if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接//该客户端生成一个 SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());//将SocketChannel设置为非阻塞socketChannel.configureBlocking(false);//将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个BuffersocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));}if (key.isReadable()) {//通过key,反向获取到对应的ChannelSocketChannel channel = (SocketChannel) key.channel();//获取到该channel关联的BufferByteBuffer buffer = (ByteBuffer) key.attachment();channel.read(buffer);System.out.println("from 客户端:" + new String(buffer.array()));}//手动从集合中移除当前的 selectionKey,防止重复操作keyIterator.remove();}}}
@Testpublic void Client() throws IOException {//得到一个网络通道SocketChannel socketChannel = SocketChannel.open();//设置非阻塞socketChannel.configureBlocking(false);//提供服务器端的IP和端口InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);//连接服务器if (!socketChannel.connect(socketAddress)){ //如果不成功while (!socketChannel.finishConnect()){System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");}}//如果连接成功,就发送数据String str = "hello, 尚硅谷";ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());//发送数据,实际上就是将buffer数据写入到channelsocketChannel.write(byteBuffer);System.in.read();}
群聊系统Demo
需要实现客户端和服务器端之间的数据通讯,服务端能够将数据转发给其他所有客户端。
/********************服务端********************/public class GroupChatServer {//定义属性private Selector selector;private ServerSocketChannel listenChannel;private static final int PORT = 6666;//构造器//初始化工作public GroupChatServer() {try {//得到选择器selector = Selector.open();listenChannel = ServerSocketChannel.open();//绑定端口listenChannel.socket().bind(new InetSocketAddress(PORT));//设置非阻塞模式listenChannel.configureBlocking(false);//将listenChannel注册到selector,绑定监听事件listenChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}//监听public void listen() {try {//循环处理while (true) {int count = selector.select();if (count > 0) { //有事件处理//遍历得到SelectionKey集合Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {//取出SelectionKeySelectionKey key = iterator.next();//监听到accept,连接事件if (key.isAcceptable()) {SocketChannel socketChannel = listenChannel.accept();//将该channel设置非阻塞并注册到selectorsocketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);//提示System.out.println(socketChannel.getRemoteAddress() + " 上线...");}if (key.isReadable()) { //通道可以读取数据,即server端收到客户端的消息,//处理读(专门写方法)readData(key);}iterator.remove();}} else {System.out.println("等待。。。");}}} catch (Exception e) {e.printStackTrace();}}//读取客户端消息private void readData(SelectionKey key) {//定义一个SocketChannelSocketChannel channel = null;try {//取到关联的channelchannel = (SocketChannel) key.channel();//创建缓冲bufferByteBuffer buffer = ByteBuffer.allocate(1024);int count = channel.read(buffer);//根据count值判断是否读取到数据if (count > 0) {//把缓冲区的数据转成字符串String msg = new String(buffer.array());//输出该消息System.out.println("from 客户端:" + msg);//向其他的客户端转发消息(去掉自己),专门写一个方法处理sendInfoToOtherClients(msg, channel);}} catch (IOException e) {try {System.out.println(channel.getRemoteAddress() + "离线了...");//取消注册key.cancel();//关闭通道channel.close();} catch (IOException ex) {ex.printStackTrace();}}}//转发消息给其他客户端(通道)private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {System.out.println("服务器转发消息中。。。");//遍历 所有注册到selector上的SocketChannel,并排除selffor (SelectionKey key : selector.keys()) {//通过key取出对应的SocketChannelChannel targetChannel = key.channel();//排除自己if (targetChannel instanceof SocketChannel && targetChannel != self) {//转型SocketChannel dest = (SocketChannel) targetChannel;//将msg,存储到bufferByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());//将buffer的数据写入通道dest.write(buffer);}}}public static void main(String[] args) {//创建服务器对象GroupChatServer groupChatServer = new GroupChatServer();groupChatServer.listen();}}
/*****************************客户端**********************/public class GroupChatClient {//定义相关的属性private static final String HOST = "127.0.0.1"; //服务器的IP地址private static final int PORT = 6666; //服务器端口private Selector selector;private SocketChannel socketChannel;private String username;//构造器,初始化操作public GroupChatClient() throws IOException {selector = Selector.open();//连接服务器socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));//设置非阻塞socketChannel.configureBlocking(false);//将channel注册到selectorsocketChannel.register(selector, SelectionKey.OP_READ);//得到usernameusername = socketChannel.getLocalAddress().toString().substring(1);System.out.println(username + " is ok...");}//向服务器发送消息public void sendInfo(String info){info = username + " 说:" + info;try {socketChannel.write(ByteBuffer.wrap(info.getBytes()));}catch (IOException e){e.printStackTrace();}}//读取从服务器端回复的消息public void readInfo(){try {int readChannels = selector.select();if (readChannels > 0){//有可用的通道Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();if (key.isReadable()){//得到相关的通道SocketChannel sc = (SocketChannel)key.channel();//得到一个bufferByteBuffer buf = ByteBuffer.allocate(1024);//读取sc.read(buf);//把缓冲区的数据转成字符串String msg = new String(buf.array());System.out.println(msg.trim());}}}else {System.out.println("没有可以用的通道...");}}catch (Exception e){}}public static void main(String[] args) throws IOException {//启动客户端GroupChatClient chatClient = new GroupChatClient();//启动一个线程用于读取服务器的消息new Thread(() -> {while (true){chatClient.readInfo();try {Thread.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}}}).start();//主线程用于发送数据给服务器端Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String s = scanner.nextLine();chatClient.sendInfo(s);}}}
注意事项:
使用int read = channel.read(buffer)读取数据时,读取的结果情况:
当read=-1时,说明客户端的数据发送完毕,并且主动的关闭socket。所以这种情况下,服务器程序需要关闭socketSocket,并且取消key的注册。注意:这个时候继续使用SocketChannel进行读操作的话,就会抛出:==远程主机强迫关闭一个现有的连接==的IO异常
当read=0时:
某一时刻SocketChannel中当前没有数据可读。
客户端的数据发送完毕。
详情见此博文
但是对于博文中的这一条,经过本人测试,这种情况下返回的是读取的数据的大小,而不是0:ByteBuffer的position等于limit,这个时候也会返回0
