例子
public class TestIO {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//int kqfd = kqueue()
final Selector selector = Selector.open();
//f_cntl
serverSocketChannel.socket().setReuseAddress(true);
//bind(fd)
serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 9998));
//配置非阻塞,如果设置成true还是阻塞的配置
//fcntl(fd,flags|n_noblack);
serverSocketChannel.configureBlocking(false);
//注意这里的第三个参数,会attach到SelectorKey上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
System.out.println("-- server ready");
while (true) {
//jdk 11才有的方法
selector.select(key -> {
if (!key.isValid()) {
return;
}
//获取到attach的key
final ServerSocketChannel ch = (ServerSocketChannel) key.attachment();
try {
if (key.isAcceptable()) {
//获取endpoint,对应了serverSocket.accpet()返回的socket
SocketChannel client_ch = ch.accept();
if (client_ch != null) { // accept() may return null...
System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
//客户端也需要配置成非阻塞
client_ch.configureBlocking(false);
//监听可读时间
client_ch.register(selector, SelectionKey.OP_READ, key.attachment());
}
} else if (key.isReadable()) {
//整体的流程就是从socket中读取数据
//然后写入到队列中,然后注册监听可写事件
//当触发可写事件时,从队列中获取数据并写回
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
buffer.flip();
if (read == -1) {
throw new IOException("Socket closed");
}
String result = new String(buffer.array()).trim();
System.out.println("receive:" + result);
ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
synchronized (channelToPendingWrites) {
pendingWrites = channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
pendingWrites = new ConcurrentLinkedQueue<>();
channelToPendingWrites.put(key.channel(), pendingWrites);
}
}
}
pendingWrites.add(writeBuffer);
socketChannel.register(selector, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
//参考可读事件的注释
SocketChannel socketChannel = (SocketChannel) key.channel();
final Queue<Object> objects = channelToPendingWrites.get(socketChannel);
if (objects == null || objects.size() == 0) {
final ByteBuffer allocate = ByteBuffer.allocate(12);
allocate.put("hello,world!".getBytes());
socketChannel.write(allocate);
} else {
final Object poll = objects.poll();
socketChannel.write((ByteBuffer) poll);
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
} catch (Exception e) {
e.printStackTrace();
}
}, 10000L);
}
}
private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private static final byte[] ACK = "Data logged successfully\n".getBytes();
}
讨论什么?
- 理解IO多路复用,Java中的概念分别代表了什么.
- 零拷贝? 不再需要拷贝了么?
Java NIO中的register,Acceptable,Readable, writeable代表什么意思。
IO模型I — 阻塞
- 系统调用
#include <stdio.h>
int main() {
printf("Hello, World!");
return 0;
}
输出结果
[root@iZbp11om21c05wzu8e4tx0Z test]# vim hello.c
[root@iZbp11om21c05wzu8e4tx0Z test]# gcc hello.c -o hello
[root@iZbp11om21c05wzu8e4tx0Z test]# strace ./hello
execve("./hello", ["./hello"], [/* 22 vars */]) = 0
brk(NULL) = 0x6fe000
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d87000
access("/etc/ld.so.preload", R_OK) = -1 ENOENT (No such file or directory)
open("/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3
fstat(3, {st_mode=S_IFREG|0644, st_size=30947, ...}) = 0
mmap(NULL, 30947, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7f1fa6d7f000
close(3) = 0
open("/lib64/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\3\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\20&\2\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0755, st_size=2156160, ...}) = 0
mmap(NULL, 3985888, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7f1fa6799000
mprotect(0x7f1fa695c000, 2097152, PROT_NONE) = 0
mmap(0x7f1fa6b5c000, 24576, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x1c3000) = 0x7f1fa6b5c000
mmap(0x7f1fa6b62000, 16864, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6b62000
close(3) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d7e000
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d7c000
arch_prctl(ARCH_SET_FS, 0x7f1fa6d7c740) = 0
mprotect(0x7f1fa6b5c000, 16384, PROT_READ) = 0
mprotect(0x600000, 4096, PROT_READ) = 0
mprotect(0x7f1fa6d88000, 4096, PROT_READ) = 0
munmap(0x7f1fa6d7f000, 30947) = 0
fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 0), ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d86000
write(1, "Hello, World!", 13Hello, World!) = 13
exit_group(0) = ?
+++ exited with 0 +++
- 缓冲区
- 概念
- 多路
- 复用
- 可读、可写事件
- 边缘触发、水平触发
- 数据结构 - O(n), O(logn)
- 关联Java中的NIO
- 零拷贝
- 什么是零拷贝? 完全没有拷贝么?
- Reactor 模型 (原文章已经不能下载了) 《Scalable IO in Java》
- Redis中的使用(强烈建读)
Netty中的使用
-
实操
单线程
int main() {
int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //AF_INT:ipv4, SOCK_STREAM:tcp协议
//将套接字和IP、端口绑定
struct sockaddr_in *serv_addr = malloc(sizeof(struct sockaddr_in));
serv_addr->sin_family = AF_INET; //使用IPv4地址
serv_addr->sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址
serv_addr->sin_port = htons(11234); //端口
int on = 1;
if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) == -1) {
printf("处理失败SO_REUSEPORT失败");
exit(1);
}
bind(serv_sock, (struct sockaddr *) serv_addr, sizeof(struct sockaddr));
listen(serv_sock, 20);
for (;;) {
int fd = accept(serv_sock, NULL, NULL);
char *hello = "hello chenshun!";
char cc[20];
ssize_t rr = read(fd, cc, 10);
if (rr == -1) {
close(fd);
}
printf("i receive your message:%s\n", cc);
write(fd, hello, strlen(hello));
write(fd, "good bye!", strlen("good bye!"));
close(fd);
}
return 0;
}
多线程
```c void run(void ff) { int fd = (int ) (ff); char *hello = “hello chenshun!”; char cc[11]; ssize_t rr = read(fd, cc, 10); if (rr == -1) { close(fd); } printf(“i receive your message:%s\n”, cc); write(fd, hello, strlen(hello)); write(fd, “good bye!”, strlen(“good bye!”)); close(fd); return 0; }
-
int main() { int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //AF_INT:ipv4, SOCK_STREAM:tcp协议 //将套接字和IP、端口绑定 struct sockaddr_in serv_addr = malloc(sizeof(struct sockaddr_in)); serv_addr->sin_family = AF_INET; //使用IPv4地址 serv_addr->sin_addr.s_addr = inet_addr(“127.0.0.1”); //具体的IP地址 serv_addr->sin_port = htons(11234); //端口 int on = 1; if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) == -1) { printf(“处理失败SO_REUSEPORT失败”); exit(1); } bind(serv_sock, (struct sockaddr ) serv_addr, sizeof(struct sockaddr)); listen(serv_sock, 20);
for (;;) {
int fd = accept(serv_sock, NULL, NULL);
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr, &stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
int res = pthread_create(&thread, &attr, run, &fd);
printf("创建线程执行:%d\n", res);
if (res != 0) {
exit(-1);
}
}
return 0;
}
<a name="ga2zT"></a>
#### IO多路复用
```c
//
// Created by chenshun on 2022/3/21.
//
#include <sys/socket.h>
#include <sys/event.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
const size_t PAGE = 1024 * 16;
const int kReadEvent = 1;
const int kWriteEvent = 2;
void updateEvent(int kqFd, int fd, int events) {
struct kevent ke;
if (events & kReadEvent) {
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, 0);
kevent(kqFd, &ke, 1, NULL, 0, NULL);
}
if (events & kWriteEvent) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
kevent(kqFd, &ke, 1, NULL, 0, NULL);
}
}
void delEvent(int kqFd, int fd, int events) {
struct kevent ke;
if (events & kReadEvent) {
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
kevent(kqFd, &ke, 1, NULL, 0, NULL);
}
if (events & kWriteEvent) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
kevent(kqFd, &ke, 1, NULL, 0, NULL);
}
}
void handleAccept(int kq, int socket_fd) {
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
int client = accept(socket_fd, (struct sockaddr *) &sa, &salen);
int flags = fcntl(client, F_GETFL, 0);
fcntl(client, F_SETFL, flags | O_NONBLOCK);
updateEvent(kq, client, kReadEvent);
}
void handleRead(int kq, int fd) {
char buf[PAGE];
size_t rr = read(fd, &buf, PAGE);
if (rr == 0) {
delEvent(kq, fd, kReadEvent | kWriteEvent);
close(fd);
return;
}
delEvent(kq, fd, kReadEvent);
if (rr == 6 && !strcmp("exit\r\n", buf)) {
write(fd, "good bye!\n", strlen("good bye!\n"));
close(fd);
return;
}
printf("receive bytes: %zu, i have receive your message: %s\n", rr, buf);
updateEvent(kq, fd, kWriteEvent);
}
void handleWrite(int kq, int fd) {
char *send = "i have receive your message, expect your next message\n";
write(fd, send, strlen(send));
delEvent(kq, fd, kWriteEvent);
updateEvent(kq, fd, kReadEvent);
}
void loop_once(int kq, int socket_fd, int waitms) {
struct timespec timeout;
timeout.tv_sec = waitms / 1000;
timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
struct kevent *event = malloc(sizeof(struct kevent *));
int retval = kevent(kq, NULL, 0, event, 20, &timeout);
if (retval > 0) {
printf("收到事件: %d 件\n", retval);
}
for (int i = 0; i < retval; i++) {
struct kevent ev = event[i];
uintptr_t fd = ev.ident;
if (ev.filter == EVFILT_READ) {
//处理可读时间
if (fd == socket_fd) {
handleAccept(kq, socket_fd);
} else {
handleRead(kq, (int) fd);
}
}
if (ev.filter == EVFILT_WRITE) {
//处理可写事件
handleWrite(kq, (int) fd);
}
}
free(event);
}
int main() {
int port = 19999;
int socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in *serv_addr = malloc(sizeof(struct sockaddr_in));
if (serv_addr == NULL) {
printf("分配内存失败");
exit(1);
}
serv_addr->sin_family = AF_INET;
serv_addr->sin_addr.s_addr = inet_addr("127.0.0.1");
serv_addr->sin_port = htons(port);
//
int on = 1;
setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
int flags = fcntl(socket_fd, F_GETFL, 0);
fcntl(socket_fd, F_SETFL, flags | O_NONBLOCK);
if (bind(socket_fd, (struct sockaddr *) serv_addr, sizeof(struct sockaddr)) < 0) {
printf("bind 失败!");
exit(1);
}
listen(socket_fd, 20);
printf("使用telnet 127.0.0.1 19999 来进行测试\n");
printf("输入exit来断开TCP链接\n");
int queue = kqueue();
//注册
updateEvent(queue, socket_fd, kReadEvent);
for (;;) {
loop_once(queue, socket_fd, 10000);
}
return 0;
}
多线程IO多路复用
无