介绍

完成端口(Completion Port)是Windows下伸缩性最好的I/O模型,同时它也是最复杂的内核对象。完成端口内部提供了线程池的管理,可以避免反复创建线程的开销,同时可以根据CPU的个数灵活地决定线程个数,减少线程调度的次数,从而提高了程序的并行处理能力。
由于其稳定、高效的并发通信能力,目前,完成端口在面向现实应用的许多网络通信中应用很广泛,例如大型多人在线游戏、大型即时通信系统、网吧管理系统以及企业管理系统等具有大量并发用户请求的场合。

完成端口的相关概念

Windows操作系统的设计目的是提供一个安全、健壮的系统,能够运行各种各样的应用程序,为成千上万的用户服务。在Windows Sockets编程中,服务器程序的并发处理能力是非常重要的设计因素。根据并发管理方式的不同,通常将服务器程序划分为循环服务器和并发服务器两种类型

并发服务器的设计思路

循环服务器使用单个线程来处理客户请求,在同一时刻只能处理一个客户的请求,一般适合于小规模、简单的客户请求。
并发服务器使用线程等待客户请求,并使用独立的工作线程与客户进行请求处理或通信,由于每个客户拥有专门的通信服务线程,因此能够及时、公平地获得服务器的服务响应。
设计并发服务器的一种思路是:对于新来的客户请求,创建新的工作线程进行专门的请求处理。但是当客户数量巨大时,这种思路存在一些不足,主要表现在以下三点:
1)服务器能够创建的线程数量是有限的。每个线程在操作系统中都会消耗一定的资源,如果线程数量巨大,则可能无法为新建的线程分配足够的系统资源。
2)操作系统对线程的管理和调度会占用CPU资源,进而降低系统的响应速度。
3)频繁地创建线程和结束线程涉及反复的资源分配与释放,会浪费大量的系统资源。
由此来看,在客户数量巨大的情况下,一个新客户对应一个新线程的并发管理方式并不合适。
设计并发服务器的另一种思路是采用线程池对工作线程进行管理,它的原理是:并行的线程数量必须有一个上限,也就是说客户请求数量并不是决定服务器工作线程数量的主要因素。

线程池

线程池是一种多线程的处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个工作线程来使所有处理器保持工作。如果所有线程池线程都始终处于工作状态,但任务队列中包含挂起的工作,则线程池将在一段时间后创建另一个工作线程来执行挂起的工作,但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但它们要等到其他线程完成后才启动。
线程池的使用既限制了工作线程的数量,又避免了反复创建线程的开销,减少了线程调度的开销,从而提高了服务器程序的性能。

完成端口模型

完成端口模型使用线程池对线程进行管理,预先创建和维护线程,并规定了并行线程的数量。在处理多个并发异步I/O请求时,使用完成端口模型比在I/O请求时创建线程更快更有效。
可以把完成端口看成是系统维护的一个队列,操作系统把重叠I/O操作完成的事件通知放到该队列里。当某项I/O操作完成时,系统会向服务器完成端口发送一个I/O完成数据包,此操作在系统内部完成。应用程序在收到I/O完成数据包后,完成端口队列中的一个线程被唤醒,为客户提供服务。服务完成后,该线程会继续在完成端口上等待后续I/O请求事件的通知。
由于是暴露“操作完成”的事件通知,所以命名为“完成端口”。一个套接字被创建后,可以在任何时刻和一个完成端口联系起来。

工作线程与完成端口

一般来说,在I/O请求投递给完成端口对象后,一个应用程序需要创建多个工作线程来处理完成端口上的通知事件。那么究竟要创建多少个线程为完成端口提供服务呢?
实际上,工作线程的数量依赖于程序的总体设计情况。在理想情况下,应该对应一个CPU创建一个线程。因为在理想的完成端口模型中,每个线程都可以从系统获得一个原子性的时间片,轮番运行并检查完成端口,线程的切换是额外的开销。在实际开发的时候,还要考虑这些线程是否牵涉其他阻塞操作的情况(比如Sleep()或WaitForSingleObject()),从而导致程序进入了暂停、锁定或挂起状态,那么应允许另一个线程获得运行时间。因此,可以多创建几个线程,以便在发生阻塞的时候充分发挥系统的潜力。

单句柄数据(Per-handle Data)和单I/O操作数据(Per-I/O Operation Data)

单句柄数据对应着与某个套接字关联的数据,用来把客户数据和对应的完成通知关联起来,这样每次我们处理完成通知的时候,就能知道它是哪个客户的消息,并且可以根据客户的信息做出相应的反应。可以为单句柄数据定义一个数据结构来保存其关联的信息,其中包含了套接字句柄以及与该套接字有关的信息。
单I/O操作数据则不同,它记录了每次I/O通知的信息,允许我们在同一个句柄上同时管理多个I/O操作(读、写、多个读、多个写等)。单I/O操作数据可以是追加到一个OVERLAPPED结构末尾的任意长度字节。假如一个函数要求用到一个OVERLAPPED结构,可以为单I/O操作数据定义一个数据结构来保存具体的操作类型和数据,并将OVERLAPPED结构作为新结构的第一个元素使用。

完成端口模型的相关函数

(1)完成端口对象创建函数:CreateIoCompletionPort()
在设计基于完成端口模型的套接字应用程序时,首先需要调用CreateIoCompletionPort()创建完成端口对象,将一个I/O完成端口关联到任意多个句柄(这里是套接字)上,从而管理多个I/O请求。
该函数用于两个明显不同的目的:
● 用于创建一个完成端口对象;
● 用于将一个句柄同完成端口关联到一起。
CreateIoCompletionPort()函数的原型定义如下:

  1. HANDLE WINAPI CreateIoCompletionPort(
  2. __in HANDLE FileHandle,
  3. __in HANDLE ExistingCompletionPort,
  4. __in ULONG_PTR CompletionKey,
  5. __in DWORD NumberOfConcurrentThreads
  6. );

其中:
● FileHandle:是重叠I/O操作关联的文件句柄(此处是套接字)。如果FileHandle被指定为INVALID_HANDLE_VALUE,则CreateIoCompletionPort()函数创建一个与文件句柄无关的I/O完成端口。此时ExistingCompletionPort参数必须为NULL,且CompletionKey参数被忽略。
● ExistingCompletionPort:是已经存在的完成端口句柄。如果指定一个已存在的完成端口句柄,则函数将其关联到FileHandle参数指定的文件句柄上,如果设置为NULL,则函数创建一个与FileHandle参数指定的文件句柄相关联的新的I/O完成端口。
● CompletionKey:包含在每个I/O完成数据包中用于指定文件句柄的单句柄数据,它将与FileHandle文件句柄关联在一起,应用程序可以在此存储任意类型的信息,通常是一个指针。
● NumberOfConcurrentThreads:指定I/O完成端口上操作系统允许的并发处理I/O完成数据包的最大线程数量。如果ExistingCompletionPort参数为0,表示允许等同于处理器个数的线程访问该消息队列。该参数的指定需要具体考虑应用程序的总体设计情况。
面对不同的应用目的,参数的使用是有所区别的。如果用于创建一个完成端口,则唯一感兴趣的参数是NumberOfConcurrentThreads,前面三个参数都会被忽略。如果在完成端口上拥有足够多的工作线程来为I/O请求提供服务,则需要将套接字句柄与完成端口关联到一起,这要求在一个现有的完成端口上调用CreateIoCompletionPort()函数,同时为前三个参数提供套接字的信息,其中FileHandle参数指定一个要同完成端口关联在一起的套接字句柄。
如果函数执行成功,则返回与指定文件句柄(此处是套接字)相关联的I/O完成端口句柄,如果失败则返回NULL。可以调用GetLastError()函数获取错误信息。

(2)等待重叠I/O操作结果函数:GetQueuedCompletionStatus()
将套接字与完成端口关联后,应用程序就可以接收到与该套接字上执行的异步I/O操作完成后发送的通知。
在完成端口模型中,发起重叠I/O操作的方法与重叠I/O模型相似,但等待重叠I/O操作结果的方法并不相同,完成端口模型通过调用GetQueuedCompletionStatus()函数等待重叠I/O操作的完成结果,该函数的原型定义如下:

  1. BOOL WINAPI GetQueuedCompletionStatus(
  2. __in HANDLE CompletionPort,
  3. __out LPDWORD lpNumberOfBytes,
  4. __out PULONG_PTR lpCompletionKey,
  5. __out LPOVERLAPPED* lpOverlapped,
  6. __in DWORD dwMilliseconds
  7. );

其中:
● CompletionPort:完成端口对象句柄。
● lpNumberOfBytes:获取已经完成的I/O操作中传输的字节数。
● lpCompletionKey:获取与已经完成的I/O操作的文件句柄相关联的单句柄数据,在一个套接字首次与完成端口关联到一起的时候,那些数据便与一个特定的套接字句柄对应起来了,这些数据是运行CreateIoCompletionPort()函数时通过CompletionKey参数传递的。
● lpOverlapped:在完成的I/O操作开始时指定的重叠结构地址,在它后面跟随单I/O操作数据。
● dwMilliseconds:函数在完成端口上等待的时间。如果在等待时间内没有I/O操作完成通知包到达完成端口,则函数返回FALSE,lpOverlapped的值为NULL。如果该参数为INFINITE,则函数不会出现调用超时的情况,如果该参数为0,则函数立即返回。
I/O服务线程调用GetQueuedCompletionStatus()函数取得有事件发生的套接字信息,通过lpNumberOfBytes获得传输的字节数量,通过lpCompletionKey得到与套接字关联的单句柄数据,通过lpOverlapped参数得到投递I/O请求时使用的重叠对象地址,进一步得到单I/O操作数据。
如果函数从完成端口上获取到一个成功的I/O操作完成通知包,则函数返回非0值。函数将获取到的重叠操作信息保存在lpNumberOfBytes、lpCompletionKey和lpOverlapped参数中。
如果函数从完成端口上获取到一个失败的I/O操作完成通知包,则函数返回0。
如果函数调用超时,则返回0。具体错误可以通过GetLastError()函数获得。

完成端口模型的编程框架

完成端口模型依赖于Windows环境下的线程池机制进行异步I/O处理。套接字创建后,在完成端口模型下,当发生网络I/O时应用程序的执行过程是:操作系统把重叠I/O操作完成的事件通知放到队列里,当某项I/O操作完成时,系统会向服务器完成端口发送一个I/O完成数据包,应用程序在收到I/O完成数据包后,完成端口队列中的一个线程被唤醒,为客户提供服务。
以面向连接的数据接收为例,在完成端口模型下,套接字的编程框架如图8-10所示。Image00090.jpg
整体来看,基于完成端口模型的网络应用程序的基本流程如下。
在主程序中:
1)判断系统中安装了多少个处理器,创建n个工作线程,n一般取当前计算机中处理器个数。工作线程的主要功能是检测完成端口的状态,如果有来自客户的数据,则接收数据,处理请求;
2)初始化Windows Sockets环境,初始化套接字;
3)创建完成端口对象,将待处理网络请求的套接字与完成端口对象关联;
4)异步接收数据,无论能否接收到数据,都会直接返回。
在工作线程中:
1)调用GetQueuedCompletionStatus()函数检查完成端口的状态;
2)根据GetQueuedCompletionStatus()返回的数据和状态进行具体的请求处理。

2.基于完成端口模型的套接字通信服务器示例

以下示例实现了基于完成端口模型的套接字通信服务器,该服务器的主要功能是并发接收客户使用TCP协议发来的数据,打印接收到的数据的字节数。
(1)第一步:定义结构
1)PER_IO_DATA结构。PER_IO_DATA结构用于保存单I/O操作的相关数据,包含了重叠结构、缓冲区对象、缓冲区数组、接收的字节数等,定义如下:

  1. typedef struct
  2. {
  3. OVERLAPPED Overlapped; // 重叠结构
  4. WSABUF DataBuf; // 缓冲区对象
  5. CHAR Buffer[DEFAULT_BUFLEN]; // 缓冲区数组
  6. DWORD BytesRECV; // 接收的字节数
  7. } PER_IO_DATA, * LPPER_IO_DATA;

2)PER_HANDLE_DATA结构。PER_HANDLE_DATA结构用于保存单句柄数据,此处为与客户进行通信的套接字,定义如下:

  1. typedef struct
  2. {
  3. SOCKET Socket;
  4. } PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

(2)第二步:实现工作线程ServerWorkerThread()
以系统中的CPU数量为参考,多个工作线程可并行地在多个套接字上进行数据处理。
工作线程函数ServerWorkerThread()的实现代码如下:

  1. DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
  2. 2 {
  3. 3  HANDLE CompletionPort = (HANDLE) CompletionPortID; // 完成端口句柄
  4. 4  DWORD BytesTransferred; // 数据传输的字节数
  5. 5  LPPER_HANDLE_DATA PerHandleData; // 套接字句柄结构
  6. 6  LPPER_IO_DATA PerIoData; // I/O操作结构
  7. 7  DWORD RecvBytes; // 接收的数量
  8. 8  DWORD Flags; // WSARecv()函数中的
  9. // 标志位
  10. 9  while(TRUE)
  11. 10  {
  12. 11  // 检查完成端口的状态
  13. 12  if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
  14.   (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
  15. 13  {
  16. 14  printf("GetQueuedCompletionStatus failed!\n");
  17. 15  return 0;
  18. 16  }
  19. 17  // 如果数据传送完了,则退出
  20. 18  if (BytesTransferred == 0)
  21. 19  {
  22. 20  printf("Closing socket %d\n", PerHandleData->Socket);
  23. 21  // 关闭套接字
  24. 22  if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)
  25. 23  {
  26. 24  printf("closesocket failed with error %d\n", WSAGetLastError());
  27. 25  return 0;
  28. 26  }
  29. 27  // 释放结构资源
  30. 28  GlobalFree(PerHandleData);
  31. 29  GlobalFree(PerIoData);
  32. 30  continue;
  33. 31  }
  34. 32  // 如果还没有记录接收的数据数量,则将收到的字节数保存在PerIoData->BytesRECV中
  35. 33  if (PerIoData->BytesRECV == 0)
  36. 34  {
  37. 35  PerIoData->BytesRECV = BytesTransferred;
  38. 36  }
  39. 37  // 成功接收到数据
  40. 38  printf("\nBytes received: %d\n", BytesTransferred);
  41. 39  // 处理数据请求
  42. 40  //……
  43. 41  PerIoData->BytesRECV = 0;
  44. 42  Flags = 0;
  45. 43  ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
  46. 44  PerIoData->DataBuf.len = DEFAULT_BUFLEN;
  47. 45  PerIoData->DataBuf.buf = PerIoData->Buffer;
  48. 46  iResult = WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1,
  49.   &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);
  50. 47  if ( iResult == SOCKET_ERROR)
  51. 48  {
  52. 49  if (WSAGetLastError() != ERROR_IO_PENDING)
  53. 50  {
  54. 51  printf("WSARecv() failed with error %d\n", WSAGetLastError());
  55. 52  return 0;
  56. 53  }
  57. 54  }
  58. 55  }
  59. 56 }

输入参数:LPVOID CompletionPortID,指向完成端口对象。
输出参数:
● 0:成功;
● -1:失败。
在每个工作线程中,调用GetQueuedCompletionStatus()函数检查完成端口状态,参数BytesTransferred用于接收传输数据的字节数。如果GetQueuedCompletionStatus()函数返回,但参数BytesTransferred为0,则说明客户程序已经退出,第17~31行代码关闭与客户进行通信的套接字,释放占用的资源。
PER_IO_DATA结构对象PerIoData用于保存I/O操作中的数据。如果其BytesRECV字段值非0,则打印接收到的字节数,之后第44~54行代码再次调用WSARecv()函数,投递另一个重叠I/O操作。
(3)第三步:实现主函数

  1. 1 int _tmain(int argc, _TCHAR* argv[])
  2. 2 {
  3. 3  SOCKADDR_IN InternetAddr; // 服务器地址
  4. 4  SOCKET ServerSocket = INVALID_SOCKET; // 监听套接字
  5. 5  SOCKET AcceptSocket = INVALID_SOCKET; // 与客户进行通信的套接字
  6. 6  HANDLE CompletionPort; // 完成端口句柄
  7. 7  SYSTEM_INFO SystemInfo; // 系统信息(这里主要用于获取CPU数量)
  8. 8  LPPER_HANDLE_DATA PerHandleData; // 套接字句柄结构
  9. 9  LPPER_IO_DATA PerIoData; // I/O操作结构
  10. 10  DWORD RecvBytes; // 接收到的字节数
  11. 11  DWORD Flags; // WSARecv()函数中指定的标志位
  12. 12  DWORD ThreadID; // 工作线程编号
  13. 13  WSADATA wsaData; // Windows Socket初始化信息
  14. 14  DWORD Ret; // 函数返回值
  15. 15  // 创建新的完成端口
  16. 16  if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
  17. NULL,
  18.   0, 0)) == NULL)
  19. 17  {
  20. 18  printf( "CreateIoCompletionPort failed! \n");
  21. 19  return -1;
  22. 20  }
  23. 21  // 获取系统信息
  24. 22  GetSystemInfo(&SystemInfo);
  25. 23  // 根据CPU数量启动线程
  26. 24  for(int i = 0; i<SystemInfo.dwNumberOfProcessors * 2; i++)
  27. 25  {
  28. 26  HANDLE ThreadHandle;
  29. 27  // 创建线程,运行ServerWorkerThread()函数
  30. 28  if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread,
  31.   CompletionPort, 0, &ThreadID)) == NULL)
  32. 29  {
  33. 30  printf("CreateThread() failed with error %d\n", GetLastError());
  34. 31  return -1;
  35. 32  }
  36. 33  CloseHandle(ThreadHandle);
  37. 34  }
  38. 35  // 初始化Windows Sockets环境
  39. 36  if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)
  40. 37  {
  41. 38  printf("WSAStartup failed with error %d\n", Ret);
  42. 39  return -1;
  43. 40  }
  44. 41  // 创建监听套接字
  45. 42  ServerSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
  46.   WSA_FLAG_OVERLAPPED);
  47. 43  if (ServerSocket== INVALID_SOCKET)
  48. 44  {
  49. 45  printf("WSASocket() failed with error %d\n", WSAGetLastError());
  50. 46  return -1;
  51. 47  }
  52. 48  // 绑定到本地地址的端口
  53. 49  InternetAddr.sin_family = AF_INET;
  54. 50  InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  55. 51  InternetAddr.sin_port = htons(DEFAULT_PORT);
  56. 52  iResult = bind(ServerSocket, (PSOCKADDR) &InternetAddr,
  57.   sizeof(InternetAddr));
  58. 53  if (iResult == SOCKET_ERROR)
  59. 54  {
  60. 55  printf("bind() failed with error %d\n", WSAGetLastError());
  61. 56  return -1;
  62. 57  }
  63. 58  // 开始监听
  64. 59  if (listen(ServerSocket, 5) == SOCKET_ERROR)
  65. 60  {
  66. 61  printf("listen() failed with error %d\n", WSAGetLastError());
  67. 62  return -1;
  68. 63  }
  69. 64  printf("TCP server starting\n");
  70. 65  // 监听端口打开,就开始在这里循环,一有套接字连上,WSAAccept就创建一个套接字,
  71. 66  // 这个套接字 和完成端口关联上
  72. 67  sockaddr_in addrClient;
  73. 68  int addrClientlen =sizeof( sockaddr_in);
  74. 69  while(TRUE)
  75. 70  {
  76. 71  // 等待客户连接
  77. 72  AcceptSocket = WSAAccept(ServerSocket, (sockaddr *)&addrClient,
  78.   &addrClientlen, NULL, 0);
  79. 73  if( AcceptSocket == SOCKET_ERROR)
  80. 74  {
  81. 75  printf("WSAAccept() failed with error %d\n", WSAGetLastError());
  82. 76  return -1;
  83. 77  }
  84. 78  // 分配并设置套接字句柄结构
  85. 79  PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR,
  86.   sizeof(PER_HANDLE_DATA);
  87. 80  if (PerHandleData == NULL)
  88. 81  {
  89. 82  printf("GlobalAlloc() failed with error %d\n", GetLastError());
  90. 83  return -1;
  91. 84  }
  92. 85  PerHandleData->Socket = AcceptSocket;
  93. 86  // 将与客户进行通信的套接字Accept与完成端口CompletionPort相关联
  94. 87  if (CreateIoCompletionPort((HANDLE) AcceptSocket,
  95.   CompletionPort, (DWORD) PerHandleData, 0) == NULL)
  96. 88  {
  97. 89  printf("CreateIoCompletionPort failed!\n");
  98. 90  return -1;
  99. 91  }
  100. 92  // 为I/O操作结构分配内存空间
  101. 93  PerIoData = (LPPER_IO_DATA) GlobalAlloc(GPTR,sizeof(PER_IO_DATA));
  102. 94  if (PerIoData == NULL)
  103. 95  {
  104. 96  printf("GlobalAlloc() failed with error %d\n", GetLastError());
  105. 97  return -1;
  106. 98  }
  107. 99  // 初始化I/O操作结构
  108. 100  ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
  109. 101  PerIoData->BytesRECV = 0;
  110. 102  PerIoData->DataBuf.len = DEFAULT_BUFLEN;
  111. 103  PerIoData->DataBuf.buf = PerIoData->Buffer;
  112. 104  Flags = 0;
  113. 105  // 接收数据,放到PerIoData中,通过工作线程函数取出
  114. 106  iResult = WSARecv(AcceptSocket, &(PerIoData->DataBuf), 1,
  115.   &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);
  116. 107  if (iResult == SOCKET_ERROR)
  117. 108  {
  118. 109  if (WSAGetLastError() != ERROR_IO_PENDING)
  119. 110  {
  120. 111  printf("WSARecv() failed! \n");
  121. 112  return -1;
  122. 113  }
  123. 114  }
  124. 115  }
  125. 116  return 0;
  126. 117  }

本函数借助线程池机制实现了基于流式套接字的并发服务器。
第15~34行代码创建完成端口对象CompletionPort,并参考当前计算机中CPU的数量创建工作线程,将新建的完成端口对象作为线程创建的参数。
第35~64行代码进行基于流式套接字的服务器程序初始化,首先初始化Windows Sockets环境,然后创建流式套接字,将其绑定到本地地址的27015端口上。
第65~115行代码在while循环上处理来自客户的连接请求,接受连接,并将得到的与客户进行通信的套接字保存到LPPER_HANDLE_DATA结构对象PerHandleData中,调用CreateIoCompletionPort()函数将AcceptSocket与前面的完成端口CompletionPort相关联,在AcceptSocket上调用WSARecv()函数,异步接收套接字上来自客户的数据,此时WSARecv()函数是异步调用的。另外,在工作线程中会检测完成端口对象的状态,并接收来自客户的数据。

完成端口模型评价

完成端口模型是应用程序使用线程池处理异步I/O请求的一种机制,在Windows服务平台上比较成熟,是伸缩性最好的,也是迄今为止最为复杂的一种I/O模型。当应用程序需要管理上千个套接字时,利用完成端口模型往往可以达到最佳的系统性能。
实际上完成端口是Windows I/O的一种结构,它可以接收多种对象的句柄,除了对套接字对象进行管理之外,还可以应用于文件对象等。