活动对象

活动对象模式将执行与对象的成员函数解耦,每个对象会留在在自己的控制线程中。其目标是通过使用异步方法,处理调度器的请求,从而触发并发。维基百科:Active object。所以,这种模式也称为并发对象模式。

客户端的调用会转到代理,代理表现为活动对象的接口。服务提供活动对象的实现,并在单独的线程中运行。代理在运行时将客户端的调用转换为对服务的调用,调度程序将方法加入到激活列表中。调度器与服务在相同的线程中活动,并将方法调用从激活列表中取出,再将它们分派到相应的服务上。最后,客户端可以通过future从代理处获取最终的结果。

组件

活动对象模式由六个组件组成:

  1. 代理为活动对象的可访问方法提供接口。代理将触发激活列表的方法,并请求对象的构造。并且,代理和客户端运行在相同的线程中。
  2. 方法请求类定义了执行活动对象的接口。
  3. 激活列表的目标是维护挂起的请求,激活列表将客户端线程与活动对象线程解耦。代理对入队请求的进行处理,而调度器将请求移出队列。
  4. 调度器与代理可在不同的线程中运行。调度器会在活动对象的线程中运行,并决定接下来执行激活列表中的哪个请求。
  5. 可以通过服务实现活动对象,并在活动对象的线程中运行,服务也支持代理接口。
  6. future是由代理创造的,客户端可以从future上获取活动对象调用的结果。客户端可以安静等待结果,也可以对结果进行轮询。

下面的图片显示了消息的顺序。

活动对象 - 图1

代理

代理设计模式是《设计模式:可重用的面向对象软件的元素》中的经典模式,代理是其他对象的代表。典型的代理可以是远程代理CORBA、安全代理、虚拟代理或智能指针,如std::shared_ptr。每个代理会为它所代表的对象添加额外的功能。远程代理代表远程对象,并使客户端产生本地对象的错觉。安全代理通过对数据进行加密和解密,将不安全的连接转换为安全的连接。虚拟代理以惰性的方式封装对象的创建,智能指针将接管底层内存的生存期。

活动对象 - 图2

  • 代理具有与RealSubject相同的接口,用于管理引用,还有subject的生命周期。
  • 与Subject具有相同的接口,如代理和RealSubject。
  • RealSubject用于提供具体的功能。

关于代理模式的更多细节,可以参考Wikipedia页面。

优点和缺点

介绍Active Object模式的最小实现前,先了解一下它的优点和缺点。

  • 优点:
    • 同步只需要在活动对象的线程上进行,不需要在客户端的线程上进行。
    • 客户端(用户)和服务器(实现者)之间的解耦,同步的挑战则在实现者的一边。
    • 由于客户端为异步请求,所以系统的吞吐量提高了,从而调用处理密集型方法不会阻塞整个系统。
    • 调度器可以实现各种策略来执行挂起请求,因此可以按不同的顺序执行入队请求。
  • 缺点:
    • 如果请求的粒度太细,则活动对象模式(如代理、激活列表和调度器)的性能开销可能过大。
    • 由于调度器的调度策略和操作系统的调度互相影响,调试活动对象模式通常非常困难,尤其是以不同顺序执行请求的情况下。

具体实现

下面的示例展示了活动对象模式的简单实现。我没有定义一个请求,这应该由代理和服务实现。而且,当请求调度程序执行下一个请求时,服务应该只执行这个请求。

所涉及的类型为future<vector<future<pair<bool, int>>>>,这个类型的标识有点长。为了提高可读性,我使用了声明(第16 - 37行)。

  1. // activeObject.cpp
  2. #include <algorithm>
  3. #include <deque>
  4. #include <functional>
  5. #include <future>
  6. #include <iostream>
  7. #include <memory>
  8. #include <mutex>
  9. #include <numeric>
  10. #include <random>
  11. #include <thread>
  12. #include <utility>
  13. #include <vector>
  14. using std::async;
  15. using std::boolalpha;
  16. using std::cout;
  17. using std::deque;
  18. using std::distance;
  19. using std::endl;
  20. using std::for_each;
  21. using std::find_if;
  22. using std::future;
  23. using std::lock_guard;
  24. using std::make_move_iterator;
  25. using std::make_pair;
  26. using std::move;
  27. using std::mt19937;
  28. using std::mutex;
  29. using std::packaged_task;
  30. using std::pair;
  31. using std::random_device;
  32. using std::sort;
  33. using std::thread;
  34. using std::uniform_int_distribution;
  35. using std::vector;
  36. class IsPrime {
  37. public:
  38. pair<bool, int> operator()(int i) {
  39. for (int j = 2; j * j <= i; ++j) {
  40. if (i % j == 0)return std::make_pair(false, i);
  41. }
  42. return std::make_pair(true, i);
  43. }
  44. };
  45. class ActivaeObject {
  46. public:
  47. future<pair<bool, int>> enqueueTask(int i) {
  48. IsPrime isPrime;
  49. packaged_task<pair<bool, int>(int)> newJob(isPrime);
  50. auto isPrimeFuture = newJob.get_future();
  51. auto pair = make_pair(move(newJob), i);
  52. {
  53. lock_guard<mutex> lockGuard(activationListMutex);
  54. activationList.push_back(move(pair));
  55. }
  56. return isPrimeFuture;
  57. }
  58. void run() {
  59. thread servant([this] {
  60. while (!isEmpty()) {
  61. auto myTask = dequeueTask();
  62. myTask.first(myTask.second);
  63. }
  64. });
  65. servant.join();
  66. }
  67. private:
  68. pair<packaged_task<pair<bool, int>(int)>, int> dequeueTask() {
  69. lock_guard<mutex> lockGuard(activationListMutex);
  70. auto myTask = std::move(activationList.front());
  71. activationList.pop_front();
  72. return myTask;
  73. }
  74. bool isEmpty() {
  75. lock_guard<mutex> lockGuard(activationListMutex);
  76. auto empty = activationList.empty();
  77. return empty;
  78. }
  79. deque<pair<packaged_task<pair<bool, int>(int)>, int >> activationList;
  80. mutex activationListMutex;
  81. };
  82. vector<int> getRandNumber(int number) {
  83. random_device seed;
  84. mt19937 engine(seed());
  85. uniform_int_distribution<> dist(1000000, 1000000000);
  86. vector<int> numbers;
  87. for (long long i = 0; i < number; ++i) numbers.push_back(dist(engine));
  88. return numbers;
  89. }
  90. future<vector<future<pair<bool, int>>>> getFutures(ActivaeObject& activeObject,
  91. int numberPrimes) {
  92. return async([&activeObject, numberPrimes] {
  93. vector<future<pair<bool, int>>> futures;
  94. auto randNumbers = getRandNumber(numberPrimes);
  95. for (auto numb : randNumbers) {
  96. futures.push_back(activeObject.enqueueTask(numb));
  97. }
  98. return futures;
  99. });
  100. }
  101. int main() {
  102. cout << boolalpha << endl;
  103. ActivaeObject activeObject;
  104. // a few clients enqueue work concurrently
  105. auto client1 = getFutures(activeObject, 1998);
  106. auto client2 = getFutures(activeObject, 2003);
  107. auto client3 = getFutures(activeObject, 2011);
  108. auto client4 = getFutures(activeObject, 2014);
  109. auto client5 = getFutures(activeObject, 2017);
  110. // give me the futures
  111. auto futures = client1.get();
  112. auto futures2 = client2.get();
  113. auto futures3 = client3.get();
  114. auto futures4 = client4.get();
  115. auto futures5 = client5.get();
  116. // put all futures together
  117. futures.insert(futures.end(), make_move_iterator(futures2.begin()),
  118. make_move_iterator(futures2.end()));
  119. futures.insert(futures.end(), make_move_iterator(futures3.begin()),
  120. make_move_iterator(futures3.end()));
  121. futures.insert(futures.end(), make_move_iterator(futures4.begin()),
  122. make_move_iterator(futures4.end()));
  123. futures.insert(futures.end(), make_move_iterator(futures5.begin()),
  124. make_move_iterator(futures5.end()));
  125. // run the promises
  126. activeObject.run();
  127. // get the results from the futures
  128. vector<pair<bool, int>> futResults;
  129. futResults.reserve(futResults.size());
  130. for (auto& fut : futures)futResults.push_back(fut.get());
  131. sort(futResults.begin(), futResults.end());
  132. // separate the primes from the non-primes
  133. auto prIt = find_if(futResults.begin(), futResults.end(),
  134. [](pair<bool, int>pa) {return pa.first == true; });
  135. cout << "Number primes: " << distance(prIt, futResults.end()) << endl;
  136. cout << "Primes: " << endl;
  137. for_each(prIt, futResults.end(), [](auto p) {cout << p.second << " "; });
  138. cout << "\n\n";
  139. cout << "Number no primes: " << distance(futResults.begin(), prIt) << endl;
  140. cout << "No primes: " << endl;
  141. for_each(futResults.begin(), prIt, [](auto p) {cout << p.second << " "; });
  142. cout << endl;
  143. }

示例的基本思想是,客户端可以在激活列表上并发地安排作业。线程的工作是确定哪些数是质数。激活列表是活动对象的一部分,而活动对象在一个单独的线程上进行入队操作,并且客户端可以在激活列表中查询作业的结果。

程序的详情:5个客户端通过getFutures将工作(第121 - 126行)入队到activeObjectnumberPrimes中的数字是1000000到1000000000之间(第96行)的随机数,将这些数值放入vector<future<pair<bool, int>>中。future<pair<bool, int>持有一个boolint对,其中bool表示int值是否是质数。再看看第108行:future .push_back(activeObject.enqueueTask(numb))。此调用将触发新作业进入激活列表的队列,所有对激活列表的调用都必须受到保护,这里激活列表是一个promise队列(第89行):deque<pair<packaged_task<pair<bool, int>(int)>, int >>

每个promise在调用执行函数对象IsPrime(第39 - 47行)时,会返回一个boolint对。现在,工作包已经准备好了,开始计算吧。所有客户端在第129 - 133行中返回关联future的句柄,并把所有的future放在一起(第136 - 146行),这样会使工作更加容易。第149行中的调用activeObject.run()启动执行。run(第64 - 72行)启动单独的线程,并执行promises(第68行),直到执行完所有作业(第66行)。isEmpty(第83 - 87行)确定队列是否为空,dequeTask会返回一个新任务。通过在每个future上调用futResults.push_back(fut.get())(第154行),所有结果都会推送到futResults上。第156行对成对的向量进行排序:vector<pair<bool, int>>。其余代码则是给出了计算结果,第159行中的迭代器prIt将第一个迭代器指向一个素数对。

程序打印素数数量为distance(prIt, futResults.end())(第162行),并(第164行)逐一显示。

活动对象 - 图3

拓展阅读