介绍
完成端口(Completion Port)是Windows下伸缩性最好的I/O模型,同时它也是最复杂的内核对象。完成端口内部提供了线程池的管理,可以避免反复创建线程的开销,同时可以根据CPU的个数灵活地决定线程个数,减少线程调度的次数,从而提高了程序的并行处理能力。
由于其稳定、高效的并发通信能力,目前,完成端口在面向现实应用的许多网络通信中应用很广泛,例如大型多人在线游戏、大型即时通信系统、网吧管理系统以及企业管理系统等具有大量并发用户请求的场合。
完成端口的相关概念
Windows操作系统的设计目的是提供一个安全、健壮的系统,能够运行各种各样的应用程序,为成千上万的用户服务。在Windows Sockets编程中,服务器程序的并发处理能力是非常重要的设计因素。根据并发管理方式的不同,通常将服务器程序划分为循环服务器和并发服务器两种类型
并发服务器的设计思路
循环服务器使用单个线程来处理客户请求,在同一时刻只能处理一个客户的请求,一般适合于小规模、简单的客户请求。
并发服务器使用线程等待客户请求,并使用独立的工作线程与客户进行请求处理或通信,由于每个客户拥有专门的通信服务线程,因此能够及时、公平地获得服务器的服务响应。
设计并发服务器的一种思路是:对于新来的客户请求,创建新的工作线程进行专门的请求处理。但是当客户数量巨大时,这种思路存在一些不足,主要表现在以下三点:
1)服务器能够创建的线程数量是有限的。每个线程在操作系统中都会消耗一定的资源,如果线程数量巨大,则可能无法为新建的线程分配足够的系统资源。
2)操作系统对线程的管理和调度会占用CPU资源,进而降低系统的响应速度。
3)频繁地创建线程和结束线程涉及反复的资源分配与释放,会浪费大量的系统资源。
由此来看,在客户数量巨大的情况下,一个新客户对应一个新线程的并发管理方式并不合适。
设计并发服务器的另一种思路是采用线程池对工作线程进行管理,它的原理是:并行的线程数量必须有一个上限,也就是说客户请求数量并不是决定服务器工作线程数量的主要因素。
线程池
线程池是一种多线程的处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个工作线程来使所有处理器保持工作。如果所有线程池线程都始终处于工作状态,但任务队列中包含挂起的工作,则线程池将在一段时间后创建另一个工作线程来执行挂起的工作,但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但它们要等到其他线程完成后才启动。
线程池的使用既限制了工作线程的数量,又避免了反复创建线程的开销,减少了线程调度的开销,从而提高了服务器程序的性能。
完成端口模型
完成端口模型使用线程池对线程进行管理,预先创建和维护线程,并规定了并行线程的数量。在处理多个并发异步I/O请求时,使用完成端口模型比在I/O请求时创建线程更快更有效。
可以把完成端口看成是系统维护的一个队列,操作系统把重叠I/O操作完成的事件通知放到该队列里。当某项I/O操作完成时,系统会向服务器完成端口发送一个I/O完成数据包,此操作在系统内部完成。应用程序在收到I/O完成数据包后,完成端口队列中的一个线程被唤醒,为客户提供服务。服务完成后,该线程会继续在完成端口上等待后续I/O请求事件的通知。
由于是暴露“操作完成”的事件通知,所以命名为“完成端口”。一个套接字被创建后,可以在任何时刻和一个完成端口联系起来。
工作线程与完成端口
一般来说,在I/O请求投递给完成端口对象后,一个应用程序需要创建多个工作线程来处理完成端口上的通知事件。那么究竟要创建多少个线程为完成端口提供服务呢?
实际上,工作线程的数量依赖于程序的总体设计情况。在理想情况下,应该对应一个CPU创建一个线程。因为在理想的完成端口模型中,每个线程都可以从系统获得一个原子性的时间片,轮番运行并检查完成端口,线程的切换是额外的开销。在实际开发的时候,还要考虑这些线程是否牵涉其他阻塞操作的情况(比如Sleep()或WaitForSingleObject()),从而导致程序进入了暂停、锁定或挂起状态,那么应允许另一个线程获得运行时间。因此,可以多创建几个线程,以便在发生阻塞的时候充分发挥系统的潜力。
单句柄数据(Per-handle Data)和单I/O操作数据(Per-I/O Operation Data)
单句柄数据对应着与某个套接字关联的数据,用来把客户数据和对应的完成通知关联起来,这样每次我们处理完成通知的时候,就能知道它是哪个客户的消息,并且可以根据客户的信息做出相应的反应。可以为单句柄数据定义一个数据结构来保存其关联的信息,其中包含了套接字句柄以及与该套接字有关的信息。
单I/O操作数据则不同,它记录了每次I/O通知的信息,允许我们在同一个句柄上同时管理多个I/O操作(读、写、多个读、多个写等)。单I/O操作数据可以是追加到一个OVERLAPPED结构末尾的任意长度字节。假如一个函数要求用到一个OVERLAPPED结构,可以为单I/O操作数据定义一个数据结构来保存具体的操作类型和数据,并将OVERLAPPED结构作为新结构的第一个元素使用。
完成端口模型的相关函数
(1)完成端口对象创建函数:CreateIoCompletionPort()
在设计基于完成端口模型的套接字应用程序时,首先需要调用CreateIoCompletionPort()创建完成端口对象,将一个I/O完成端口关联到任意多个句柄(这里是套接字)上,从而管理多个I/O请求。
该函数用于两个明显不同的目的:
● 用于创建一个完成端口对象;
● 用于将一个句柄同完成端口关联到一起。
CreateIoCompletionPort()函数的原型定义如下:
HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle,
__in HANDLE ExistingCompletionPort,
__in ULONG_PTR CompletionKey,
__in DWORD NumberOfConcurrentThreads
);
其中:
● FileHandle:是重叠I/O操作关联的文件句柄(此处是套接字)。如果FileHandle被指定为INVALID_HANDLE_VALUE,则CreateIoCompletionPort()函数创建一个与文件句柄无关的I/O完成端口。此时ExistingCompletionPort参数必须为NULL,且CompletionKey参数被忽略。
● ExistingCompletionPort:是已经存在的完成端口句柄。如果指定一个已存在的完成端口句柄,则函数将其关联到FileHandle参数指定的文件句柄上,如果设置为NULL,则函数创建一个与FileHandle参数指定的文件句柄相关联的新的I/O完成端口。
● CompletionKey:包含在每个I/O完成数据包中用于指定文件句柄的单句柄数据,它将与FileHandle文件句柄关联在一起,应用程序可以在此存储任意类型的信息,通常是一个指针。
● NumberOfConcurrentThreads:指定I/O完成端口上操作系统允许的并发处理I/O完成数据包的最大线程数量。如果ExistingCompletionPort参数为0,表示允许等同于处理器个数的线程访问该消息队列。该参数的指定需要具体考虑应用程序的总体设计情况。
面对不同的应用目的,参数的使用是有所区别的。如果用于创建一个完成端口,则唯一感兴趣的参数是NumberOfConcurrentThreads,前面三个参数都会被忽略。如果在完成端口上拥有足够多的工作线程来为I/O请求提供服务,则需要将套接字句柄与完成端口关联到一起,这要求在一个现有的完成端口上调用CreateIoCompletionPort()函数,同时为前三个参数提供套接字的信息,其中FileHandle参数指定一个要同完成端口关联在一起的套接字句柄。
如果函数执行成功,则返回与指定文件句柄(此处是套接字)相关联的I/O完成端口句柄,如果失败则返回NULL。可以调用GetLastError()函数获取错误信息。
(2)等待重叠I/O操作结果函数:GetQueuedCompletionStatus()
将套接字与完成端口关联后,应用程序就可以接收到与该套接字上执行的异步I/O操作完成后发送的通知。
在完成端口模型中,发起重叠I/O操作的方法与重叠I/O模型相似,但等待重叠I/O操作结果的方法并不相同,完成端口模型通过调用GetQueuedCompletionStatus()函数等待重叠I/O操作的完成结果,该函数的原型定义如下:
BOOL WINAPI GetQueuedCompletionStatus(
__in HANDLE CompletionPort,
__out LPDWORD lpNumberOfBytes,
__out PULONG_PTR lpCompletionKey,
__out LPOVERLAPPED* lpOverlapped,
__in DWORD dwMilliseconds
);
其中:
● CompletionPort:完成端口对象句柄。
● lpNumberOfBytes:获取已经完成的I/O操作中传输的字节数。
● lpCompletionKey:获取与已经完成的I/O操作的文件句柄相关联的单句柄数据,在一个套接字首次与完成端口关联到一起的时候,那些数据便与一个特定的套接字句柄对应起来了,这些数据是运行CreateIoCompletionPort()函数时通过CompletionKey参数传递的。
● lpOverlapped:在完成的I/O操作开始时指定的重叠结构地址,在它后面跟随单I/O操作数据。
● dwMilliseconds:函数在完成端口上等待的时间。如果在等待时间内没有I/O操作完成通知包到达完成端口,则函数返回FALSE,lpOverlapped的值为NULL。如果该参数为INFINITE,则函数不会出现调用超时的情况,如果该参数为0,则函数立即返回。
I/O服务线程调用GetQueuedCompletionStatus()函数取得有事件发生的套接字信息,通过lpNumberOfBytes获得传输的字节数量,通过lpCompletionKey得到与套接字关联的单句柄数据,通过lpOverlapped参数得到投递I/O请求时使用的重叠对象地址,进一步得到单I/O操作数据。
如果函数从完成端口上获取到一个成功的I/O操作完成通知包,则函数返回非0值。函数将获取到的重叠操作信息保存在lpNumberOfBytes、lpCompletionKey和lpOverlapped参数中。
如果函数从完成端口上获取到一个失败的I/O操作完成通知包,则函数返回0。
如果函数调用超时,则返回0。具体错误可以通过GetLastError()函数获得。
完成端口模型的编程框架
完成端口模型依赖于Windows环境下的线程池机制进行异步I/O处理。套接字创建后,在完成端口模型下,当发生网络I/O时应用程序的执行过程是:操作系统把重叠I/O操作完成的事件通知放到队列里,当某项I/O操作完成时,系统会向服务器完成端口发送一个I/O完成数据包,应用程序在收到I/O完成数据包后,完成端口队列中的一个线程被唤醒,为客户提供服务。
以面向连接的数据接收为例,在完成端口模型下,套接字的编程框架如图8-10所示。
整体来看,基于完成端口模型的网络应用程序的基本流程如下。
在主程序中:
1)判断系统中安装了多少个处理器,创建n个工作线程,n一般取当前计算机中处理器个数。工作线程的主要功能是检测完成端口的状态,如果有来自客户的数据,则接收数据,处理请求;
2)初始化Windows Sockets环境,初始化套接字;
3)创建完成端口对象,将待处理网络请求的套接字与完成端口对象关联;
4)异步接收数据,无论能否接收到数据,都会直接返回。
在工作线程中:
1)调用GetQueuedCompletionStatus()函数检查完成端口的状态;
2)根据GetQueuedCompletionStatus()返回的数据和状态进行具体的请求处理。
2.基于完成端口模型的套接字通信服务器示例
以下示例实现了基于完成端口模型的套接字通信服务器,该服务器的主要功能是并发接收客户使用TCP协议发来的数据,打印接收到的数据的字节数。
(1)第一步:定义结构
1)PER_IO_DATA结构。PER_IO_DATA结构用于保存单I/O操作的相关数据,包含了重叠结构、缓冲区对象、缓冲区数组、接收的字节数等,定义如下:
typedef struct
{
OVERLAPPED Overlapped; // 重叠结构
WSABUF DataBuf; // 缓冲区对象
CHAR Buffer[DEFAULT_BUFLEN]; // 缓冲区数组
DWORD BytesRECV; // 接收的字节数
} PER_IO_DATA, * LPPER_IO_DATA;
2)PER_HANDLE_DATA结构。PER_HANDLE_DATA结构用于保存单句柄数据,此处为与客户进行通信的套接字,定义如下:
typedef struct
{
SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
(2)第二步:实现工作线程ServerWorkerThread()
以系统中的CPU数量为参考,多个工作线程可并行地在多个套接字上进行数据处理。
工作线程函数ServerWorkerThread()的实现代码如下:
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
2 {
3 HANDLE CompletionPort = (HANDLE) CompletionPortID; // 完成端口句柄
4 DWORD BytesTransferred; // 数据传输的字节数
5 LPPER_HANDLE_DATA PerHandleData; // 套接字句柄结构
6 LPPER_IO_DATA PerIoData; // I/O操作结构
7 DWORD RecvBytes; // 接收的数量
8 DWORD Flags; // WSARecv()函数中的
// 标志位
9 while(TRUE)
10 {
11 // 检查完成端口的状态
12 if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
13 {
14 printf("GetQueuedCompletionStatus failed!\n");
15 return 0;
16 }
17 // 如果数据传送完了,则退出
18 if (BytesTransferred == 0)
19 {
20 printf("Closing socket %d\n", PerHandleData->Socket);
21 // 关闭套接字
22 if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
23 {
24 printf("closesocket failed with error %d\n", WSAGetLastError());
25 return 0;
26 }
27 // 释放结构资源
28 GlobalFree(PerHandleData);
29 GlobalFree(PerIoData);
30 continue;
31 }
32 // 如果还没有记录接收的数据数量,则将收到的字节数保存在PerIoData->BytesRECV中
33 if (PerIoData->BytesRECV == 0)
34 {
35 PerIoData->BytesRECV = BytesTransferred;
36 }
37 // 成功接收到数据
38 printf("\nBytes received: %d\n", BytesTransferred);
39 // 处理数据请求
40 //……
41 PerIoData->BytesRECV = 0;
42 Flags = 0;
43 ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
44 PerIoData->DataBuf.len = DEFAULT_BUFLEN;
45 PerIoData->DataBuf.buf = PerIoData->Buffer;
46 iResult = WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1,
&RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);
47 if ( iResult == SOCKET_ERROR)
48 {
49 if (WSAGetLastError() != ERROR_IO_PENDING)
50 {
51 printf("WSARecv() failed with error %d\n", WSAGetLastError());
52 return 0;
53 }
54 }
55 }
56 }
输入参数:LPVOID CompletionPortID,指向完成端口对象。
输出参数:
● 0:成功;
● -1:失败。
在每个工作线程中,调用GetQueuedCompletionStatus()函数检查完成端口状态,参数BytesTransferred用于接收传输数据的字节数。如果GetQueuedCompletionStatus()函数返回,但参数BytesTransferred为0,则说明客户程序已经退出,第17~31行代码关闭与客户进行通信的套接字,释放占用的资源。
PER_IO_DATA结构对象PerIoData用于保存I/O操作中的数据。如果其BytesRECV字段值非0,则打印接收到的字节数,之后第44~54行代码再次调用WSARecv()函数,投递另一个重叠I/O操作。
(3)第三步:实现主函数
1 int _tmain(int argc, _TCHAR* argv[])
2 {
3 SOCKADDR_IN InternetAddr; // 服务器地址
4 SOCKET ServerSocket = INVALID_SOCKET; // 监听套接字
5 SOCKET AcceptSocket = INVALID_SOCKET; // 与客户进行通信的套接字
6 HANDLE CompletionPort; // 完成端口句柄
7 SYSTEM_INFO SystemInfo; // 系统信息(这里主要用于获取CPU数量)
8 LPPER_HANDLE_DATA PerHandleData; // 套接字句柄结构
9 LPPER_IO_DATA PerIoData; // I/O操作结构
10 DWORD RecvBytes; // 接收到的字节数
11 DWORD Flags; // WSARecv()函数中指定的标志位
12 DWORD ThreadID; // 工作线程编号
13 WSADATA wsaData; // Windows Socket初始化信息
14 DWORD Ret; // 函数返回值
15 // 创建新的完成端口
16 if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL,
0, 0)) == NULL)
17 {
18 printf( "CreateIoCompletionPort failed! \n");
19 return -1;
20 }
21 // 获取系统信息
22 GetSystemInfo(&SystemInfo);
23 // 根据CPU数量启动线程
24 for(int i = 0; i<SystemInfo.dwNumberOfProcessors * 2; i++)
25 {
26 HANDLE ThreadHandle;
27 // 创建线程,运行ServerWorkerThread()函数
28 if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread,
CompletionPort, 0, &ThreadID)) == NULL)
29 {
30 printf("CreateThread() failed with error %d\n", GetLastError());
31 return -1;
32 }
33 CloseHandle(ThreadHandle);
34 }
35 // 初始化Windows Sockets环境
36 if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)
37 {
38 printf("WSAStartup failed with error %d\n", Ret);
39 return -1;
40 }
41 // 创建监听套接字
42 ServerSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
WSA_FLAG_OVERLAPPED);
43 if (ServerSocket== INVALID_SOCKET)
44 {
45 printf("WSASocket() failed with error %d\n", WSAGetLastError());
46 return -1;
47 }
48 // 绑定到本地地址的端口
49 InternetAddr.sin_family = AF_INET;
50 InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
51 InternetAddr.sin_port = htons(DEFAULT_PORT);
52 iResult = bind(ServerSocket, (PSOCKADDR) &InternetAddr,
sizeof(InternetAddr));
53 if (iResult == SOCKET_ERROR)
54 {
55 printf("bind() failed with error %d\n", WSAGetLastError());
56 return -1;
57 }
58 // 开始监听
59 if (listen(ServerSocket, 5) == SOCKET_ERROR)
60 {
61 printf("listen() failed with error %d\n", WSAGetLastError());
62 return -1;
63 }
64 printf("TCP server starting\n");
65 // 监听端口打开,就开始在这里循环,一有套接字连上,WSAAccept就创建一个套接字,
66 // 这个套接字 和完成端口关联上
67 sockaddr_in addrClient;
68 int addrClientlen =sizeof( sockaddr_in);
69 while(TRUE)
70 {
71 // 等待客户连接
72 AcceptSocket = WSAAccept(ServerSocket, (sockaddr *)&addrClient,
&addrClientlen, NULL, 0);
73 if( AcceptSocket == SOCKET_ERROR)
74 {
75 printf("WSAAccept() failed with error %d\n", WSAGetLastError());
76 return -1;
77 }
78 // 分配并设置套接字句柄结构
79 PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR,
sizeof(PER_HANDLE_DATA);
80 if (PerHandleData == NULL)
81 {
82 printf("GlobalAlloc() failed with error %d\n", GetLastError());
83 return -1;
84 }
85 PerHandleData->Socket = AcceptSocket;
86 // 将与客户进行通信的套接字Accept与完成端口CompletionPort相关联
87 if (CreateIoCompletionPort((HANDLE) AcceptSocket,
CompletionPort, (DWORD) PerHandleData, 0) == NULL)
88 {
89 printf("CreateIoCompletionPort failed!\n");
90 return -1;
91 }
92 // 为I/O操作结构分配内存空间
93 PerIoData = (LPPER_IO_DATA) GlobalAlloc(GPTR,sizeof(PER_IO_DATA));
94 if (PerIoData == NULL)
95 {
96 printf("GlobalAlloc() failed with error %d\n", GetLastError());
97 return -1;
98 }
99 // 初始化I/O操作结构
100 ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
101 PerIoData->BytesRECV = 0;
102 PerIoData->DataBuf.len = DEFAULT_BUFLEN;
103 PerIoData->DataBuf.buf = PerIoData->Buffer;
104 Flags = 0;
105 // 接收数据,放到PerIoData中,通过工作线程函数取出
106 iResult = WSARecv(AcceptSocket, &(PerIoData->DataBuf), 1,
&RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);
107 if (iResult == SOCKET_ERROR)
108 {
109 if (WSAGetLastError() != ERROR_IO_PENDING)
110 {
111 printf("WSARecv() failed! \n");
112 return -1;
113 }
114 }
115 }
116 return 0;
117 }
本函数借助线程池机制实现了基于流式套接字的并发服务器。
第15~34行代码创建完成端口对象CompletionPort,并参考当前计算机中CPU的数量创建工作线程,将新建的完成端口对象作为线程创建的参数。
第35~64行代码进行基于流式套接字的服务器程序初始化,首先初始化Windows Sockets环境,然后创建流式套接字,将其绑定到本地地址的27015端口上。
第65~115行代码在while循环上处理来自客户的连接请求,接受连接,并将得到的与客户进行通信的套接字保存到LPPER_HANDLE_DATA结构对象PerHandleData中,调用CreateIoCompletionPort()函数将AcceptSocket与前面的完成端口CompletionPort相关联,在AcceptSocket上调用WSARecv()函数,异步接收套接字上来自客户的数据,此时WSARecv()函数是异步调用的。另外,在工作线程中会检测完成端口对象的状态,并接收来自客户的数据。
完成端口模型评价
完成端口模型是应用程序使用线程池处理异步I/O请求的一种机制,在Windows服务平台上比较成熟,是伸缩性最好的,也是迄今为止最为复杂的一种I/O模型。当应用程序需要管理上千个套接字时,利用完成端口模型往往可以达到最佳的系统性能。
实际上完成端口是Windows I/O的一种结构,它可以接收多种对象的句柄,除了对套接字对象进行管理之外,还可以应用于文件对象等。