NioClient

  1. public class NioClient {
  2. private static NioClientHandle nioClientHandle;
  3. public static void start(){
  4. nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
  5. new Thread(nioClientHandle,"Server").start();
  6. }
  7. //向服务器发送消息
  8. public static boolean sendMsg(String msg) throws Exception{
  9. nioClientHandle.sendMsg(msg);
  10. return true;
  11. }
  12. public static void main(String[] args) throws Exception {
  13. start();
  14. Scanner scanner = new Scanner(System.in);
  15. while(NioClient.sendMsg(scanner.next())) {
  16. }
  17. }
  18. }

NioClientHandle

  1. public class NioClientHandle implements Runnable{
  2. private String host;
  3. private int port;
  4. private volatile boolean started;
  5. private Selector selector;
  6. private SocketChannel socketChannel;
  7. public NioClientHandle(String ip, int port) {
  8. this.host = ip;
  9. this.port = port;
  10. try {
  11. selector = Selector.open();
  12. socketChannel = SocketChannel.open();
  13. socketChannel.configureBlocking(false);
  14. started = true;
  15. } catch (Exception e){
  16. e.printStackTrace();
  17. }
  18. }
  19. public void stop(){
  20. started = false;
  21. }
  22. @Override
  23. public void run() {
  24. try{
  25. doConnect();
  26. }catch(IOException e){
  27. e.printStackTrace();
  28. System.exit(1);
  29. }
  30. //循环遍历selector
  31. while(started){
  32. try{
  33. //无论是否有读写事件发生,selector每隔1s被唤醒一次
  34. selector.select(1000);
  35. //获取当前有哪些事件可以使用
  36. Set<SelectionKey> keys = selector.selectedKeys();
  37. //转换为迭代器
  38. Iterator<SelectionKey> it = keys.iterator();
  39. SelectionKey key = null;
  40. while(it.hasNext()){
  41. key = it.next();
  42. /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
  43. 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
  44. 的键出现,这会导致我们尝试再次处理它。*/
  45. it.remove();
  46. try{
  47. handleInput(key);
  48. }catch(Exception e){
  49. if(key != null){
  50. key.cancel();
  51. if(key.channel() != null){
  52. key.channel().close();
  53. }
  54. }
  55. }
  56. }
  57. }catch(Exception e){
  58. e.printStackTrace();
  59. System.exit(1);
  60. }
  61. }
  62. //selector关闭后会自动释放里面管理的资源
  63. if(selector != null) {
  64. try{
  65. selector.close();
  66. }catch (Exception e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. // 具体的事件处理方法
  72. private void handleInput(SelectionKey key) throws IOException{
  73. if(key.isValid()){
  74. //获得关心当前事件的channel
  75. SocketChannel sc = (SocketChannel) key.channel();
  76. //连接事件
  77. if(key.isConnectable()){
  78. if(sc.finishConnect()){socketChannel.register(selector,
  79. SelectionKey.OP_READ);}
  80. else {
  81. System.exit(1);
  82. }
  83. }
  84. //有数据可读事件
  85. if(key.isReadable()){
  86. //创建ByteBuffer,并开辟一个1M的缓冲区
  87. ByteBuffer buffer = ByteBuffer.allocate(1024);
  88. //读取请求码流,返回读取到的字节数
  89. int readBytes = sc.read(buffer);
  90. //读取到字节,对字节进行编解码
  91. if(readBytes>0){
  92. //将缓冲区当前的limit设置为position,position=0,
  93. // 用于后续对缓冲区的读取操作
  94. buffer.flip();
  95. //根据缓冲区可读字节数创建字节数组
  96. byte[] bytes = new byte[buffer.remaining()];
  97. //将缓冲区可读字节数组复制到新建的数组中
  98. buffer.get(bytes);
  99. String result = new String(bytes,"UTF-8");
  100. System.out.println("客户端收到消息:" + result);
  101. }
  102. //链路已经关闭,释放资源
  103. else if(readBytes<0){
  104. key.cancel();
  105. sc.close();
  106. }
  107. }
  108. }
  109. }
  110. private void doWrite(SocketChannel channel,String request)
  111. throws IOException {
  112. //将消息编码为字节数组
  113. byte[] bytes = request.getBytes();
  114. //根据数组容量创建ByteBuffer
  115. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  116. //将字节数组复制到缓冲区
  117. writeBuffer.put(bytes);
  118. //flip操作
  119. writeBuffer.flip();
  120. //发送缓冲区的字节数组
  121. /*关心事件和读写网络并不冲突*/
  122. channel.write(writeBuffer);
  123. }
  124. private void doConnect() throws IOException{
  125. if(socketChannel.connect(new InetSocketAddress(host,port))){
  126. socketChannel.register(selector, SelectionKey.OP_READ);
  127. }else{
  128. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  129. }
  130. }
  131. //写数据对外暴露的API
  132. public void sendMsg(String msg) throws Exception{
  133. doWrite(socketChannel, msg);
  134. }
  135. }