1. public class Const {
  2. public static int DEFAULT_PORT = 12345;
  3. public static String DEFAULT_SERVER_IP = "127.0.0.1";
  4. /*根据输入信息拼接出一个应答信息*/
  5. public static String response(String msg){
  6. return "Hello,"+msg+",Now is "+new java.util.Date(
  7. System.currentTimeMillis()).toString() ;
  8. }
  9. }

服务端编程

  1. public class NioServer {
  2. private static NioServerHandle nioServerHandle;
  3. public static void main(String[] args){
  4. nioServerHandle = new NioServerHandle(DEFAULT_PORT);
  5. new Thread(nioServerHandle,"Server").start();
  6. }
  7. }
  1. /**
  2. * @author
  3. * 类说明:nio通信服务端处理器
  4. */
  5. public class NioServerHandle implements Runnable{
  6. private volatile boolean started;
  7. private ServerSocketChannel serverSocketChannel;
  8. private Selector selector;
  9. /**
  10. * 构造方法
  11. * @param port 指定要监听的端口号
  12. */
  13. public NioServerHandle(int port) {
  14. try {
  15. /*创建选择器的实例*/
  16. selector = Selector.open();
  17. /*创建ServerSocketChannel的实例*/
  18. serverSocketChannel = ServerSocketChannel.open();
  19. /*设置通道为非阻塞模式*/
  20. serverSocketChannel.configureBlocking(false);
  21. /*绑定端口*/
  22. serverSocketChannel.socket().bind(new InetSocketAddress(port));
  23. /*注册事件,表示关心客户端连接*/
  24. serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
  25. started = true;
  26. System.out.println("服务器已启动,端口号:"+port);
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. @Override
  32. public void run() {
  33. while(started){
  34. try {
  35. /*获取当前有哪些事件*/
  36. selector.select(1000);
  37. /*获取事件的集合*/
  38. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  39. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  40. while(iterator.hasNext()){
  41. SelectionKey key = iterator.next();
  42. /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
  43. 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
  44. 的键出现,这会导致我们尝试再次处理它。*/
  45. iterator.remove();
  46. handleInput(key);
  47. }
  48. } catch (IOException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }
  53. /*处理事件的发生*/
  54. private void handleInput(SelectionKey key) throws IOException {
  55. if(key.isValid()){
  56. /*处理新接入的客户端的请求*/
  57. if(key.isAcceptable()){
  58. /*获取关心当前事件的Channel*/
  59. ServerSocketChannel ssc
  60. = (ServerSocketChannel) key.channel();
  61. /*接受连接*/
  62. SocketChannel sc = ssc.accept();
  63. System.out.println("==========建立连接=========");
  64. sc.configureBlocking(false);
  65. /*关注读事件*/
  66. sc.register(selector,SelectionKey.OP_READ);
  67. }
  68. /*处理对端的发送的数据*/
  69. if(key.isReadable()){
  70. SocketChannel sc = (SocketChannel) key.channel();
  71. /*创建ByteBuffer,开辟一个缓冲区*/
  72. ByteBuffer buffer = ByteBuffer.allocate(1024);
  73. /*从通道里读取数据,然后写入buffer*/
  74. int readBytes = sc.read(buffer);
  75. if(readBytes>0){
  76. /*将缓冲区当前的limit设置为position,position=0,
  77. 用于后续对缓冲区的读取操作*/
  78. buffer.flip();
  79. /*根据缓冲区可读字节数创建字节数组*/
  80. byte[] bytes = new byte[buffer.remaining()];
  81. /*将缓冲区可读字节数组复制到新建的数组中*/
  82. buffer.get(bytes);
  83. String message = new String(bytes,"UTF-8");
  84. System.out.println("服务器收到消息:"+message);
  85. /*处理数据*/
  86. String result = Const.response(message);
  87. /*发送应答消息*/
  88. doWrite(sc,result);
  89. }else if(readBytes<0){
  90. /*取消特定的注册关系*/
  91. key.cancel();
  92. /*关闭通道*/
  93. sc.close();
  94. }
  95. }
  96. }
  97. }
  98. /*发送应答消息*/
  99. private void doWrite(SocketChannel sc,String response) throws IOException {
  100. byte[] bytes = response.getBytes();
  101. ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
  102. buffer.put(bytes);
  103. buffer.flip();
  104. sc.write(buffer);
  105. }
  106. public void stop(){
  107. started = false;
  108. }
  109. }

客户端

  1. public class NioClient {
  2. private static NioClientHandle nioClientHandle;
  3. public static void start(){
  4. nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
  5. //nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,8888);
  6. new Thread(nioClientHandle,"client").start();
  7. }
  8. //向服务器发送消息
  9. public static boolean sendMsg(String msg) throws Exception{
  10. nioClientHandle.sendMsg(msg);
  11. return true;
  12. }
  13. public static void main(String[] args) throws Exception {
  14. start();
  15. Scanner scanner = new Scanner(System.in);
  16. while(NioClient.sendMsg(scanner.next()));
  17. }
  18. }
  1. /**
  2. * @author
  3. * 类说明:nio通信客户端处理器
  4. */
  5. public class NioClientHandle implements Runnable{
  6. private String host;
  7. private int port;
  8. private volatile boolean started;
  9. private Selector selector;
  10. private SocketChannel socketChannel;
  11. public NioClientHandle(String ip, int port) {
  12. this.host = ip;
  13. this.port = port;
  14. try {
  15. /*创建选择器的实例*/
  16. selector = Selector.open();
  17. /*创建ServerSocketChannel的实例*/
  18. socketChannel = SocketChannel.open();
  19. /*设置通道为非阻塞模式*/
  20. socketChannel.configureBlocking(false);
  21. started = true;
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. public void stop(){
  27. started = false;
  28. }
  29. @Override
  30. public void run() {
  31. try{
  32. doConnect();
  33. }catch(IOException e){
  34. e.printStackTrace();
  35. System.exit(1);
  36. }
  37. //循环遍历selector
  38. while(started){
  39. try{
  40. //无论是否有读写事件发生,selector每隔1s被唤醒一次
  41. selector.select(1000);
  42. //获取当前有哪些事件可以使用
  43. Set<SelectionKey> keys = selector.selectedKeys();
  44. //转换为迭代器
  45. Iterator<SelectionKey> it = keys.iterator();
  46. SelectionKey key = null;
  47. while(it.hasNext()){
  48. key = it.next();
  49. /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
  50. 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
  51. 的键出现,这会导致我们尝试再次处理它。*/
  52. it.remove();
  53. try{
  54. handleInput(key);
  55. }catch(Exception e){
  56. if(key != null){
  57. key.cancel();
  58. if(key.channel() != null){
  59. key.channel().close();
  60. }
  61. }
  62. }
  63. }
  64. }catch(Exception e){
  65. e.printStackTrace();
  66. System.exit(1);
  67. }
  68. }
  69. //selector关闭后会自动释放里面管理的资源
  70. if(selector != null)
  71. try{
  72. selector.close();
  73. }catch (Exception e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. //具体的事件处理方法
  78. private void handleInput(SelectionKey key) throws IOException{
  79. if(key.isValid()){
  80. //获得关心当前事件的channel
  81. SocketChannel sc = (SocketChannel) key.channel();
  82. //连接事件
  83. if(key.isConnectable()){
  84. if(sc.finishConnect()){
  85. socketChannel.register(selector,
  86. SelectionKey.OP_READ);}
  87. else System.exit(1);
  88. }
  89. //有数据可读事件
  90. if(key.isReadable()){
  91. //创建ByteBuffer,并开辟一个1M的缓冲区
  92. ByteBuffer buffer = ByteBuffer.allocate(1024);
  93. //读取请求码流,返回读取到的字节数
  94. int readBytes = sc.read(buffer);
  95. //读取到字节,对字节进行编解码
  96. if(readBytes>0){
  97. //将缓冲区当前的limit设置为position,position=0,
  98. // 用于后续对缓冲区的读取操作
  99. buffer.flip();
  100. //根据缓冲区可读字节数创建字节数组
  101. byte[] bytes = new byte[buffer.remaining()];
  102. //将缓冲区可读字节数组复制到新建的数组中
  103. buffer.get(bytes);
  104. String result = new String(bytes,"UTF-8");
  105. System.out.println("客户端收到消息:" + result);
  106. }
  107. //链路已经关闭,释放资源
  108. else if(readBytes<0){
  109. key.cancel();
  110. sc.close();
  111. }
  112. }
  113. }
  114. }
  115. private void doWrite(SocketChannel channel,String request)
  116. throws IOException {
  117. //将消息编码为字节数组
  118. byte[] bytes = request.getBytes();
  119. //根据数组容量创建ByteBuffer
  120. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  121. //将字节数组复制到缓冲区
  122. writeBuffer.put(bytes);
  123. //flip操作
  124. writeBuffer.flip();
  125. //发送缓冲区的字节数组
  126. /*关心事件和读写网络并不冲突*/
  127. channel.write(writeBuffer);
  128. }
  129. private void doConnect() throws IOException{
  130. /*非阻塞的连接*/
  131. if(socketChannel.connect(new InetSocketAddress(host,port))){
  132. socketChannel.register(selector,SelectionKey.OP_READ);
  133. }else{
  134. socketChannel.register(selector,SelectionKey.OP_CONNECT);
  135. }
  136. }
  137. //写数据对外暴露的API
  138. public void sendMsg(String msg) throws Exception{
  139. doWrite(socketChannel, msg);
  140. }
  141. }