服务端

  1. public class SelectServer {
  2. private static Executor executor = Executors.newFixedThreadPool(10);
  3. public static void main(String[] args) throws Exception{
  4. // 打开服务端通道
  5. ServerSocketChannel ssc = ServerSocketChannel.open();
  6. // 打开选择器
  7. Selector selector = Selector.open();
  8. // 服务端通过到绑定到端口
  9. ssc.socket().bind(new InetSocketAddress(8022));
  10. // 配置为非阻塞
  11. ssc.configureBlocking(false);
  12. // 将服务端通道的连接事件注册到选择器上
  13. // 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以
  14. ssc.register(selector, SelectionKey.OP_ACCEPT);
  15. // 开始监听
  16. while (true){
  17. // 100ms内无事件发生
  18. if (selector.select(100) == 0){
  19. continue;
  20. }
  21. // 选择器获取100ms内发生的事件
  22. Iterator<SelectionKey> iterable = selector.selectedKeys().iterator();
  23. while (iterable.hasNext()){
  24. // 处理事件
  25. SelectionKey key = iterable.next();
  26. // 监听连接事件
  27. if (key.isAcceptable()){
  28. handleAcceptable(key);
  29. }
  30. // 监听连接事件
  31. if (key.isReadable()){
  32. handleReadable(key);
  33. }
  34. // 监听连接事件
  35. if (key.isWritable() && key.isValid()){
  36. handleWritable(key);
  37. }
  38. // 监听连接事件
  39. if (key.isConnectable()){
  40. System.out.println("isConnectable = true");
  41. }
  42. iterable.remove();
  43. }
  44. }
  45. }
  46. private static void handleWritable(SelectionKey key){
  47. // 另起worker线程处理实际工作
  48. executor.execute(new Runnable() {
  49. @Override
  50. public void run() {
  51. try {
  52. // 获取客户端通道
  53. SocketChannel sc = (SocketChannel) key.channel();
  54. // 获取服务端准备发送附件内容
  55. ByteBuffer buf = (ByteBuffer)key.attachment();
  56. buf.flip();
  57. while(buf.hasRemaining()){
  58. sc.write(buf);
  59. }
  60. buf.compact();
  61. }catch (Exception e){
  62. }
  63. }
  64. });
  65. }
  66. private static void handleReadable(SelectionKey key){
  67. // 另起worker线程处理实际工作
  68. executor.execute(new Runnable() {
  69. @Override
  70. public void run() {
  71. try {
  72. // 获取客户端通道
  73. SocketChannel sc = (SocketChannel)key.channel();
  74. // 获取客户端上传附件信息
  75. ByteBuffer buf = (ByteBuffer)key.attachment();
  76. // 读取内容
  77. long bytesRead = sc.read(buf);
  78. while(bytesRead>0){
  79. buf.flip();
  80. while(buf.hasRemaining()){
  81. System.out.print((char)buf.get());
  82. }
  83. System.out.println();
  84. buf.clear();
  85. bytesRead = sc.read(buf);
  86. }
  87. if(bytesRead == -1){
  88. sc.close();
  89. }
  90. }catch (Exception e){
  91. }
  92. }
  93. });
  94. }
  95. private static void handleAcceptable(SelectionKey key){
  96. // 另起worker线程处理实际工作
  97. executor.execute(new Runnable() {
  98. @Override
  99. public void run() {
  100. try {
  101. // 拿到事件关联的通道
  102. ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
  103. // 拿到客户端
  104. SocketChannel sc = ssChannel.accept();
  105. // 配置为非阻塞
  106. sc.configureBlocking(false);
  107. // 将客户端通道的可读事件注册到选择器上
  108. sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024));
  109. }catch (Exception e){
  110. }
  111. }
  112. });
  113. }
  114. }

客户端

  1. public class SelectClient {
  2. public static void main(String[] args) throws Exception {
  3. ByteBuffer buffer = ByteBuffer.allocate(1024);
  4. SocketChannel socketChannel = null;
  5. try {
  6. socketChannel = SocketChannel.open();
  7. socketChannel.configureBlocking(false);
  8. socketChannel.connect(new InetSocketAddress("127.0.0.1", 8022));
  9. if (socketChannel.finishConnect()) {
  10. int i = 0;
  11. while (true) {
  12. TimeUnit.SECONDS.sleep(1);
  13. String info = "I'm " + i++ + "-th information from client";
  14. buffer.clear();
  15. buffer.put(info.getBytes());
  16. buffer.flip();
  17. while (buffer.hasRemaining()) {
  18. System.out.println(buffer);
  19. socketChannel.write(buffer);
  20. }
  21. }
  22. }
  23. } catch (IOException | InterruptedException e) {
  24. e.printStackTrace();
  25. } finally {
  26. try {
  27. if (socketChannel != null) {
  28. socketChannel.close();
  29. }
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }