整个TimeServer的代码较多,不过逻辑应该是比较清楚的,还加了注释。

    1. public class NIOTimeServer {
    2. //默认端口号
    3. private static final int PORT = 8585;
    4. public static void main(String[] args) throws IOException {
    5. new Thread(new TimeServerTask(PORT)).start();
    6. }
    7. private static class TimeServerTask implements Runnable{
    8. private Selector selector;
    9. private ServerSocketChannel serverSocketChannel;
    10. //停止channel 标识
    11. private volatile boolean stop = false;
    12. /**
    13. * 初始化channel 监听端口 初始化selector
    14. */
    15. public TimeServerTask(int port) {
    16. try {
    17. //初始化通道和选择器
    18. selector = Selector.open();
    19. serverSocketChannel = ServerSocketChannel.open();
    20. //设置为非阻塞模式
    21. serverSocketChannel.configureBlocking(false);
    22. //绑定端口
    23. serverSocketChannel.bind(new InetSocketAddress(port));
    24. //将通道注册到选择器
    25. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    26. System.out.println("The TimeServer started at port: " + port);
    27. } catch (IOException e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. /**
    32. * 停止
    33. */
    34. public void stop(){
    35. this.stop = true;
    36. }
    37. @Override
    38. public void run() {
    39. while(!stop){
    40. try{
    41. //这里使用带有时间限制的阻塞select方法
    42. selector.select(1000);
    43. //获取就绪通道
    44. Set<SelectionKey> selectionKeys = selector.selectedKeys();
    45. //遍历就绪通道
    46. Iterator<SelectionKey> iterator = selectionKeys.iterator();
    47. SelectionKey selectionKey;
    48. while(iterator.hasNext()){
    49. selectionKey = iterator.next();
    50. iterator.remove();
    51. try{
    52. handleInput(selectionKey);
    53. }catch (IOException e){
    54. selectionKey.cancel();
    55. if(null != selectionKey.channel()){
    56. selectionKey.channel().close();
    57. }
    58. }
    59. }
    60. }catch (Throwable t){
    61. t.printStackTrace();
    62. }
    63. }
    64. //停止 就关闭selector selector关闭后 所有注册到它上面的channel也会被关闭
    65. if(null != selector){
    66. try {
    67. selector.close();
    68. } catch (IOException e) {
    69. e.printStackTrace();
    70. }
    71. }
    72. }
    1. /**
    2. * 处理输入
    3. */
    4. private void handleInput(SelectionKey selectionKey) throws IOException {
    5. //先使用isValid方法判断是否可用
    6. if(selectionKey.isValid()){
    7. //判断是否可接收连接
    8. if(selectionKey.isAcceptable()){
    9. //从SelectionKey中拿到与之对应的ServerChannel
    10. ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
    11. //建立连接
    12. SocketChannel socketChannel = channel.accept();
    13. //非阻塞模式
    14. socketChannel.configureBlocking(false);
    15. //将socketChannel也注册到selector
    16. socketChannel.register(selector,SelectionKey.OP_READ);
    17. }
    18. if(selectionKey.isReadable()){
    19. //读取数据
    20. SocketChannel channel = (SocketChannel) selectionKey.channel();
    21. ByteBuffer buffer = ByteBuffer.allocate(1025-1);
    22. //因为channel是非阻塞的 所以read方法可能返回-1 0 以及正常读取到的字节数量
    23. int read = channel.read(buffer);
    24. if(read>0){
    25. //回绕缓冲区 便于获取有效元素
    26. buffer.flip();
    27. byte[] elements = new byte[buffer.remaining()];
    28. //将buffer中的数据写入数组
    29. buffer.get(elements);
    30. String content = new String(elements, StandardCharsets.UTF_8);
    31. System.out.println("The TimeServer received order: " + content );
    32. String response = "QUERY TIME ORDER ".equalsIgnoreCase(content)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
    33. //写回响应
    34. doWrite(response,channel);
    35. }else if(read<0){
    36. //对端链路关闭
    37. selectionKey.cancel();
    38. channel.close();
    39. }else {
    40. //读到0字节 忽略
    41. }
    42. }
    43. }
    44. }
    45. private void doWrite(String response,SocketChannel socketChannel) throws IOException {
    46. if(response!=null && response.trim().length()>0){
    47. socketChannel.write(ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8)));
    48. }
    49. }
    50. }
    51. }

    这样我们启动main线程,就会启动这个TimeServer。