服务端
public class SelectServer {
private static Executor executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws Exception{
// 打开服务端通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 打开选择器
Selector selector = Selector.open();
// 服务端通过到绑定到端口
ssc.socket().bind(new InetSocketAddress(8022));
// 配置为非阻塞
ssc.configureBlocking(false);
// 将服务端通道的连接事件注册到选择器上
// 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以
ssc.register(selector, SelectionKey.OP_ACCEPT);
// 开始监听
while (true){
// 100ms内无事件发生
if (selector.select(100) == 0){
continue;
}
// 选择器获取100ms内发生的事件
Iterator<SelectionKey> iterable = selector.selectedKeys().iterator();
while (iterable.hasNext()){
// 处理事件
SelectionKey key = iterable.next();
// 监听连接事件
if (key.isAcceptable()){
handleAcceptable(key);
}
// 监听连接事件
if (key.isReadable()){
handleReadable(key);
}
// 监听连接事件
if (key.isWritable() && key.isValid()){
handleWritable(key);
}
// 监听连接事件
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
iterable.remove();
}
}
}
private static void handleWritable(SelectionKey key){
// 另起worker线程处理实际工作
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 获取客户端通道
SocketChannel sc = (SocketChannel) key.channel();
// 获取服务端准备发送附件内容
ByteBuffer buf = (ByteBuffer)key.attachment();
buf.flip();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();
}catch (Exception e){
}
}
});
}
private static void handleReadable(SelectionKey key){
// 另起worker线程处理实际工作
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 获取客户端通道
SocketChannel sc = (SocketChannel)key.channel();
// 获取客户端上传附件信息
ByteBuffer buf = (ByteBuffer)key.attachment();
// 读取内容
long bytesRead = sc.read(buf);
while(bytesRead>0){
buf.flip();
while(buf.hasRemaining()){
System.out.print((char)buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
}catch (Exception e){
}
}
});
}
private static void handleAcceptable(SelectionKey key){
// 另起worker线程处理实际工作
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 拿到事件关联的通道
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
// 拿到客户端
SocketChannel sc = ssChannel.accept();
// 配置为非阻塞
sc.configureBlocking(false);
// 将客户端通道的可读事件注册到选择器上
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024));
}catch (Exception e){
}
}
});
}
}
客户端
public class SelectClient {
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8022));
if (socketChannel.finishConnect()) {
int i = 0;
while (true) {
TimeUnit.SECONDS.sleep(1);
String info = "I'm " + i++ + "-th information from client";
buffer.clear();
buffer.put(info.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println(buffer);
socketChannel.write(buffer);
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (socketChannel != null) {
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}