NioServer

  1. public class NioServer {
  2. private static NioServerHandle nioServerHandle;
  3. public static void main(String[] args){
  4. nioServerHandle = new NioServerHandle(DEFAULT_PORT);
  5. new Thread(nioServerHandle,"Server").start();
  6. }
  7. }

NioServerHandle

  1. public class NioServerHandle implements Runnable{
  2. private volatile boolean started;
  3. private ServerSocketChannel serverSocketChannel;
  4. private Selector selector;
  5. /**
  6. * 构造方法
  7. * @param port 指定要监听的端口号
  8. */
  9. public NioServerHandle(int port) {
  10. try {
  11. /*创建选择器的实例*/
  12. selector = Selector.open();
  13. /*创建ServerSocketChannel的实例*/
  14. serverSocketChannel = ServerSocketChannel.open();
  15. /*设置通道为非阻塞模式 只有非阻塞模式才能注册事件*/
  16. serverSocketChannel.configureBlocking(false);
  17. /*绑定端口*/
  18. serverSocketChannel.socket().bind(new InetSocketAddress(port));
  19. /*注册事件,表示关心客户端连接*/
  20. serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
  21. started = true;
  22. System.out.println("服务器已启动,端口号:"+port);
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. @Override
  28. public void run() {
  29. while(started){
  30. try {
  31. /*获取当前有哪些事件*/
  32. selector.select(1000);
  33. /*获取事件的集合*/
  34. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  35. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  36. while(iterator.hasNext()){
  37. SelectionKey key = iterator.next();
  38. /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
  39. 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
  40. 的键出现,这会导致我们尝试再次处理它。*/
  41. iterator.remove();
  42. handleInput(key);
  43. }
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. /*处理事件的发生*/
  50. private void handleInput(SelectionKey key) throws IOException {
  51. if(key.isValid()){
  52. /*处理新接入的客户端的请求*/
  53. if(key.isAcceptable()){
  54. /*通过SelectionKey拿到它关联的ServerSocketChannel*/
  55. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  56. /*接受连接 通过ServerSocketChannel创建一个SocketChannel 来处理请求 */
  57. SocketChannel sc = ssc.accept();
  58. System.out.println("==========建立连接=========");
  59. sc.configureBlocking(false);
  60. /*关注读事件*/
  61. sc.register(selector,SelectionKey.OP_READ);
  62. }
  63. /*处理对端的发送的数据*/
  64. if(key.isReadable()){
  65. SocketChannel sc = (SocketChannel) key.channel();
  66. /*创建ByteBuffer,开辟一个缓冲区*/
  67. ByteBuffer buffer = ByteBuffer.allocate(1024);
  68. /*从通道里读取数据,然后写入buffer*/
  69. int readBytes = sc.read(buffer);
  70. if(readBytes>0){
  71. /*将缓冲区当前的limit设置为position,position=0,
  72. 用于后续对缓冲区的读取操作*/
  73. buffer.flip();
  74. /*根据缓冲区可读字节数创建字节数组*/
  75. byte[] bytes = new byte[buffer.remaining()];
  76. /*将缓冲区可读字节数组复制到新建的数组中*/
  77. buffer.get(bytes);
  78. String message = new String(bytes,"UTF-8");
  79. System.out.println("服务器收到消息:"+message);
  80. /*处理数据*/
  81. String result = Const.response(message);
  82. /*发送应答消息*/
  83. doWrite(sc,result);
  84. }else if(readBytes<0){
  85. /*取消特定的注册关系*/
  86. key.cancel();
  87. /*关闭通道*/
  88. sc.close();
  89. }
  90. }
  91. }
  92. }
  93. /*发送应答消息*/
  94. private void doWrite(SocketChannel sc,String response) throws IOException {
  95. byte[] bytes = response.getBytes();
  96. ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
  97. buffer.put(bytes);
  98. buffer.flip();
  99. sc.write(buffer);
  100. }
  101. public void stop(){
  102. started = false;
  103. }
  104. }

应答信息

  1. public class Const {
  2. public static int DEFAULT_PORT = 12345;
  3. public static String DEFAULT_SERVER_IP = "127.0.0.1";
  4. /*根据输入信息拼接出一个应答信息*/
  5. public static String response(String msg){
  6. return "Hello,"+msg+",Now is "+new java.util.Date(
  7. System.currentTimeMillis()).toString() ;
  8. }
  9. }

NioServerWritable

  1. public class NioServerWritable {
  2. private static NioServerHandleWriteable nioServerHandle;
  3. public static void start(){
  4. }
  5. public static void main(String[] args){
  6. nioServerHandle = new NioServerHandleWriteable(DEFAULT_PORT);
  7. new Thread(nioServerHandle,"Server").start();
  8. }
  9. }

NioServerHandleWriteable

  1. public class NioServerHandleWriteable implements Runnable{
  2. private Selector selector;
  3. private ServerSocketChannel serverChannel;
  4. private volatile boolean started;
  5. /**
  6. * 构造方法
  7. * @param port 指定要监听的端口号
  8. */
  9. public NioServerHandleWriteable(int port) {
  10. try{
  11. //创建选择器
  12. selector = Selector.open();
  13. //打开监听通道
  14. serverChannel = ServerSocketChannel.open();
  15. //如果为 true,则此通道将被置于阻塞模式;
  16. // 如果为 false,则此通道将被置于非阻塞模式
  17. serverChannel.configureBlocking(false);//开启非阻塞模式
  18. //绑定端口 backlog设为1024
  19. serverChannel.socket()
  20. .bind(new InetSocketAddress(port),1024);
  21. //监听客户端连接请求
  22. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  23. //标记服务器已开启
  24. started = true;
  25. System.out.println("服务器已启动,端口号:" + port);
  26. }catch(IOException e){
  27. e.printStackTrace();
  28. System.exit(1);
  29. }
  30. }
  31. @Override
  32. public void run() {
  33. //循环遍历selector
  34. while(started){
  35. try{
  36. //阻塞,只有当至少一个注册的事件发生的时候才会继续.
  37. selector.select();
  38. Set<SelectionKey> keys = selector.selectedKeys();
  39. Iterator<SelectionKey> it = keys.iterator();
  40. SelectionKey key = null;
  41. while(it.hasNext()){
  42. key = it.next();
  43. it.remove();
  44. try{
  45. handleInput(key);
  46. }catch(Exception e){
  47. if(key != null){
  48. key.cancel();
  49. if(key.channel() != null){
  50. key.channel().close();
  51. }
  52. }
  53. }
  54. }
  55. }catch(Throwable t){
  56. t.printStackTrace();
  57. }
  58. }
  59. //selector关闭后会自动释放里面管理的资源
  60. if(selector != null) {
  61. try{
  62. selector.close();
  63. }catch (Exception e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }
  68. private void handleInput(SelectionKey key) throws IOException{
  69. System.out.println("当前通道事件有:" + key.interestOps());
  70. if(key.isValid()){
  71. //处理新接入的请求消息
  72. if(key.isAcceptable()){
  73. //获得关心当前事件的channel
  74. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  75. //通过ServerSocketChannel的accept创建SocketChannel实例
  76. //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
  77. SocketChannel sc = ssc.accept();
  78. System.out.println("======socket channel 建立连接=======");
  79. //设置为非阻塞的
  80. sc.configureBlocking(false);
  81. //连接已经完成了,可以开始关心读事件了
  82. sc.register(selector, SelectionKey.OP_READ);
  83. }
  84. //读消息
  85. if(key.isReadable()){
  86. System.out.println("======socket channel 数据准备完成," +
  87. "可以去读==读取=======");
  88. SocketChannel sc = (SocketChannel) key.channel();
  89. //创建ByteBuffer,并开辟一个1M的缓冲区
  90. ByteBuffer buffer = ByteBuffer.allocate(1024);
  91. //读取请求码流,返回读取到的字节数
  92. int readBytes = sc.read(buffer);
  93. //读取到字节,对字节进行编解码
  94. if(readBytes>0){
  95. //将缓冲区当前的limit设置为position,position=0,
  96. // 用于后续对缓冲区的读取操作
  97. buffer.flip();
  98. //根据缓冲区可读字节数创建字节数组
  99. byte[] bytes = new byte[buffer.remaining()];
  100. //将缓冲区可读字节数组复制到新建的数组中
  101. buffer.get(bytes);
  102. String message = new String(bytes,"UTF-8");
  103. System.out.println("服务器收到消息:" + message);
  104. //处理数据
  105. String result = response(message) ;
  106. //发送应答消息
  107. doWrite(sc,result);
  108. }
  109. //链路已经关闭,释放资源
  110. else if(readBytes<0){
  111. key.cancel();
  112. sc.close();
  113. }
  114. }
  115. if(key.isWritable()){
  116. SocketChannel sc = (SocketChannel) key.channel();
  117. ByteBuffer buffer = (ByteBuffer) key.attachment();
  118. if(buffer.hasRemaining()){
  119. int count = sc.write(buffer);
  120. System.out.println("write:" + count + "byte,remaining: " + buffer.remaining());
  121. } else{
  122. //注销写事件,只关注读事件
  123. key.interestOps(SelectionKey.OP_READ);
  124. }
  125. }
  126. }
  127. }
  128. //发送应答消息
  129. private void doWrite(SocketChannel channel,String response)
  130. throws IOException {
  131. //将消息编码为字节数组
  132. byte[] bytes = response.getBytes();
  133. //根据数组容量创建ByteBuffer
  134. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  135. //将字节数组复制到缓冲区
  136. writeBuffer.put(bytes);
  137. //flip操作
  138. writeBuffer.flip();
  139. //注册写事件 这时候应该既关注写也关注读
  140. channel.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,writeBuffer);
  141. }
  142. public void stop(){
  143. started = false;
  144. }
  145. }