1、同步执行 dispatch_sync

  1. void
  2. dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
  3. {
  4. uintptr_t dc_flags = DC_FLAG_BLOCK;
  5. if (unlikely(_dispatch_block_has_private_data(work))) {
  6. return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
  7. }
  8. _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
  9. }
  10. // 进一步查看 _dispatch_sync_f
  11. static void
  12. _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
  13. uintptr_t dc_flags)
  14. {
  15. _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
  16. }
  17. // 查看 _dispatch_sync_f_inline
  18. static inline void
  19. _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
  20. dispatch_function_t func, uintptr_t dc_flags)
  21. {
  22. if (likely(dq->dq_width == 1)) {
  23. return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
  24. }
  25. if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
  26. DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
  27. }
  28. dispatch_lane_t dl = upcast(dq)._dl;
  29. // Global concurrent queues and queues bound to non-dispatch threads
  30. // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
  31. if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
  32. return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
  33. }
  34. if (unlikely(dq->do_targetq->do_targetq)) {
  35. return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
  36. }
  37. _dispatch_introspection_sync_begin(dl);
  38. _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
  39. _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
  40. }

可以看到首先通过 width 判定是串行队列还是并发队列,如果是并发队列则调用 _dispatch_sync_invoke_and_complete,如果是串行队列则调用 _dispatch_barrier_sync_f。我们先展开看一下串行队列的同步执行源代码:

  1. static void
  2. _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
  3. dispatch_function_t func, uintptr_t dc_flags)
  4. {
  5. _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
  6. }
  7. // 查看 _dispatch_barrier_sync_f_inline
  8. static inline void
  9. _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
  10. dispatch_function_t func, uintptr_t dc_flags)
  11. {
  12. dispatch_tid tid = _dispatch_tid_self();
  13. if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
  14. DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
  15. }
  16. dispatch_lane_t dl = upcast(dq)._dl;
  17. // The more correct thing to do would be to merge the qos of the thread
  18. // that just acquired the barrier lock into the queue state.
  19. //
  20. // However this is too expensive for the fast path, so skip doing it.
  21. // The chosen tradeoff is that if an enqueue on a lower priority thread
  22. // contends with this fast path, this thread may receive a useless override.
  23. //
  24. // Global concurrent queues and queues bound to non-dispatch threads
  25. // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
  26. if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
  27. return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
  28. DC_FLAG_BARRIER | dc_flags);
  29. }
  30. if (unlikely(dl->do_targetq->do_targetq)) {
  31. return _dispatch_sync_recurse(dl, ctxt, func,
  32. DC_FLAG_BARRIER | dc_flags);
  33. }
  34. _dispatch_introspection_sync_begin(dl);
  35. _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
  36. DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
  37. dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
  38. }

可以看到上面的代码逻辑时首先获取线程 tid,然后处理死锁的情况,因此我们先看一下死锁的情况:

  1. static void
  2. _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
  3. dispatch_function_t func, uintptr_t top_dc_flags,
  4. dispatch_queue_class_t dqu, uintptr_t dc_flags)
  5. {
  6. dispatch_queue_t top_dq = top_dqu._dq;
  7. dispatch_queue_t dq = dqu._dq;
  8. if (unlikely(!dq->do_targetq)) {
  9. return _dispatch_sync_function_invoke(dq, ctxt, func);
  10. }
  11. pthread_priority_t pp = _dispatch_get_priority();
  12. struct dispatch_sync_context_s dsc = {
  13. .dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
  14. .dc_func = _dispatch_async_and_wait_invoke,
  15. .dc_ctxt = &dsc,
  16. .dc_other = top_dq,
  17. .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
  18. .dc_voucher = _voucher_get(),
  19. .dsc_func = func,
  20. .dsc_ctxt = ctxt,
  21. .dsc_waiter = _dispatch_tid_self(),
  22. };
  23. _dispatch_trace_item_push(top_dq, &dsc);
  24. __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
  25. if (dsc.dsc_func == NULL) {
  26. // dsc_func being cleared means that the block ran on another thread ie.
  27. // case (2) as listed in _dispatch_async_and_wait_f_slow.
  28. dispatch_queue_t stop_dq = dsc.dc_other;
  29. return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
  30. }
  31. _dispatch_introspection_sync_begin(top_dq);
  32. _dispatch_trace_item_pop(top_dq, &dsc);
  33. _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
  34. DISPATCH_TRACE_ARG(&dsc));
  35. }
  36. // 查看 __DISPATCH_WAIT_FOR_QUEUE__
  37. static void
  38. __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
  39. {
  40. uint64_t dq_state = _dispatch_wait_prepare(dq);
  41. if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
  42. DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
  43. "dispatch_sync called on queue "
  44. "already owned by current thread");
  45. }
  46. // Blocks submitted to the main thread MUST run on the main thread, and
  47. // dispatch_async_and_wait also executes on the remote context rather than
  48. // the current thread.
  49. //
  50. // For both these cases we need to save the frame linkage for the sake of
  51. // _dispatch_async_and_wait_invoke
  52. _dispatch_thread_frame_save_state(&dsc->dsc_dtf);
  53. if (_dq_state_is_suspended(dq_state) ||
  54. _dq_state_is_base_anon(dq_state)) {
  55. dsc->dc_data = DISPATCH_WLH_ANON;
  56. } else if (_dq_state_is_base_wlh(dq_state)) {
  57. dsc->dc_data = (dispatch_wlh_t)dq;
  58. } else {
  59. _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
  60. }
  61. if (dsc->dc_data == DISPATCH_WLH_ANON) {
  62. dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
  63. (uint8_t)_dispatch_get_basepri_override_qos_floor();
  64. _dispatch_thread_event_init(&dsc->dsc_event);
  65. }
  66. dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
  67. _dispatch_trace_runtime_event(sync_wait, dq, 0);
  68. if (dsc->dc_data == DISPATCH_WLH_ANON) {
  69. _dispatch_thread_event_wait(&dsc->dsc_event); // acquire
  70. } else {
  71. _dispatch_event_loop_wait_for_ownership(dsc);
  72. }
  73. if (dsc->dc_data == DISPATCH_WLH_ANON) {
  74. _dispatch_thread_event_destroy(&dsc->dsc_event);
  75. // If _dispatch_sync_waiter_wake() gave this thread an override,
  76. // ensure that the root queue sees it.
  77. if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
  78. _dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
  79. }
  80. }
  81. }
  82. // 展开 _dq_state_drain_locked_by
  83. static inline bool
  84. _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
  85. {
  86. return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
  87. }
  88. // 然后看一下 _dispatch_lock_is_locked_by
  89. static inline bool
  90. _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
  91. {
  92. // equivalent to _dispatch_lock_owner(lock_value) == tid
  93. return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
  94. }

可以看到队列 push 以后就是用 _dispatch_lock_is_locked_by 判断将要调度的和当前等待的队列是不是同一个,如果相同则返回 YES,产生死锁DISPATCH_CLIENT_CRASH;如果没有产生死锁,则执行 _dispatch_trace_item_pop()出队列执行。如何执行调度呢,我们可以看一下_dispatch_sync_invoke_and_complete_recurse

  1. static void
  2. _dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
  3. void *ctxt, dispatch_function_t func, uintptr_t dc_flags
  4. DISPATCH_TRACE_ARG(void *dc))
  5. {
  6. _dispatch_sync_function_invoke_inline(dq, ctxt, func);
  7. _dispatch_trace_item_complete(dc);
  8. _dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
  9. }
  10. // 看一下 _dispatch_sync_function_invoke_inline
  11. static inline void
  12. _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
  13. dispatch_function_t func)
  14. {
  15. dispatch_thread_frame_s dtf;
  16. _dispatch_thread_frame_push(&dtf, dq);
  17. _dispatch_client_callout(ctxt, func);
  18. _dispatch_perfmon_workitem_inc();
  19. _dispatch_thread_frame_pop(&dtf);
  20. }
  21. // 看一下 _dispatch_client_callout
  22. void
  23. _dispatch_client_callout(void *ctxt, dispatch_function_t f)
  24. {
  25. @try {
  26. return f(ctxt);
  27. }
  28. @catch (...) {
  29. objc_terminate();
  30. }
  31. }
  32. 可以比较清楚的看到最终执行f函数,这个就是外界传过来的回调block

2、异步调用 dispatch_async

今天我们重新来分析一遍 dispatch_async 的调用过程

  1. void
  2. dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
  3. {
  4. dispatch_continuation_t dc = _dispatch_continuation_alloc();
  5. uintptr_t dc_flags = DC_FLAG_CONSUME;
  6. dispatch_qos_t qos;
  7. qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
  8. _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
  9. }
  10. // 查看 _dispatch_continuation_init 代码,主要进行block初始化
  11. static inline dispatch_qos_t
  12. _dispatch_continuation_init(dispatch_continuation_t dc,
  13. dispatch_queue_class_t dqu, dispatch_block_t work,
  14. dispatch_block_flags_t flags, uintptr_t dc_flags)
  15. {
  16. void *ctxt = _dispatch_Block_copy(work);
  17. dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
  18. if (unlikely(_dispatch_block_has_private_data(work))) {
  19. dc->dc_flags = dc_flags;
  20. dc->dc_ctxt = ctxt;
  21. // will initialize all fields but requires dc_flags & dc_ctxt to be set
  22. return _dispatch_continuation_init_slow(dc, dqu, flags);
  23. }
  24. dispatch_function_t func = _dispatch_Block_invoke(work);
  25. if (dc_flags & DC_FLAG_CONSUME) {
  26. func = _dispatch_call_block_and_release;
  27. }
  28. return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
  29. }
  30. // 另外查看 _dispatch_continuation_async
  31. static inline void
  32. _dispatch_continuation_async(dispatch_queue_class_t dqu,
  33. dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
  34. {
  35. #if DISPATCH_INTROSPECTION
  36. if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
  37. _dispatch_trace_item_push(dqu, dc);
  38. }
  39. #else
  40. (void)dc_flags;
  41. #endif
  42. return dx_push(dqu._dq, dc, qos);
  43. }
  44. // 进一步查看 dx_push
  45. #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
  46. // 本质是调用 dx_vtable 的 dq_push (其实就是调用对象的 do_push ),进一步查看 dq_push,我们假设是 global_queue 进行异步调用可以看到:
  47. DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
  48. .do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
  49. .do_dispose = _dispatch_object_no_dispose,
  50. .do_debug = _dispatch_queue_debug,
  51. .do_invoke = _dispatch_object_no_invoke,
  52. .dq_activate = _dispatch_queue_no_activate,
  53. .dq_wakeup = _dispatch_root_queue_wakeup,
  54. .dq_push = _dispatch_root_queue_push,
  55. );

可以看到 dx_push已经到了 _dispatch_root_queue_push ,这是可以接着查看 _dispatch_root_queue_push

  1. void
  2. _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
  3. dispatch_qos_t qos)
  4. {
  5. #if DISPATCH_USE_KEVENT_WORKQUEUE
  6. dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
  7. if (unlikely(ddi && ddi->ddi_can_stash)) {
  8. dispatch_object_t old_dou = ddi->ddi_stashed_dou;
  9. dispatch_priority_t rq_overcommit;
  10. rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
  11. if (likely(!old_dou._do || rq_overcommit)) {
  12. dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
  13. dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
  14. ddi->ddi_stashed_rq = rq;
  15. ddi->ddi_stashed_dou = dou;
  16. ddi->ddi_stashed_qos = qos;
  17. _dispatch_debug("deferring item %p, rq %p, qos %d",
  18. dou._do, rq, qos);
  19. if (rq_overcommit) {
  20. ddi->ddi_can_stash = false;
  21. }
  22. if (likely(!old_dou._do)) {
  23. return;
  24. }
  25. // push the previously stashed item
  26. qos = old_qos;
  27. rq = old_rq;
  28. dou = old_dou;
  29. }
  30. }
  31. #endif
  32. #if HAVE_PTHREAD_WORKQUEUE_QOS
  33. if (_dispatch_root_queue_push_needs_override(rq, qos)) {
  34. return _dispatch_root_queue_push_override(rq, dou, qos);
  35. }
  36. #else
  37. (void)qos;
  38. #endif
  39. _dispatch_root_queue_push_inline(rq, dou, dou, 1);
  40. }
  41. // 多数情况下符合 HAVE_PTHREAD_WORKQUEUE_QOS,会执行 _dispatch_root_queue_push_override(对比的是 qos 与 root 队列的 qos 是否一致,基本上都不一致的。)
  42. static void
  43. _dispatch_root_queue_push_override(dispatch_queue_global_t orig_rq,
  44. dispatch_object_t dou, dispatch_qos_t qos)
  45. {
  46. bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
  47. dispatch_queue_global_t rq = _dispatch_get_root_queue(qos, overcommit);
  48. dispatch_continuation_t dc = dou._dc;
  49. if (_dispatch_object_is_redirection(dc)) {
  50. // no double-wrap is needed, _dispatch_async_redirect_invoke will do
  51. // the right thing
  52. dc->dc_func = (void *)orig_rq;
  53. } else {
  54. dc = _dispatch_continuation_alloc();
  55. dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
  56. dc->dc_ctxt = dc;
  57. dc->dc_other = orig_rq;
  58. dc->dc_data = dou._do;
  59. dc->dc_priority = DISPATCH_NO_PRIORITY;
  60. dc->dc_voucher = DISPATCH_NO_VOUCHER;
  61. }
  62. _dispatch_root_queue_push_inline(rq, dc, dc, 1);
  63. }
  64. /*
  65. 上面 _dispatch_object_is_redirection 函数其实就是 return _dispatch_object_has_type(dou,DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));
  66. 所以自定义队列会走这个 if 语句,如果是 dispatch_get_global_queue 不会走 if 语句。
  67. 展开 _dispatch_root_queue_push_inline。注意_dispatch_root_queue_push_inline中 的 if 把任务装进队列,大多数不走进if语句。
  68. 但是第一个任务进来之前还是满足这个条件式的,会进入这个条件语句去激活队列来执行里面的任务,后面再加入的任务因为队列被激活了,所以也就不太需要再进入这个队列了,所以相对来说激活队列只要一次。
  69. */
  70. static inline void
  71. _dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
  72. dispatch_object_t _head, dispatch_object_t _tail, int n)
  73. {
  74. struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
  75. if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
  76. return _dispatch_root_queue_poke(dq, n, 0);
  77. }
  78. }
  79. // 我们可以看到,我们装入到自定义的任务都被扔到其挂靠的root队列里去了,所以我们我们自己创建的队列只是一个代理人身份,继续查看 _dispatch_root_queue_poke 源码
  80. void
  81. _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
  82. {
  83. if (!_dispatch_queue_class_probe(dq)) {
  84. return;
  85. }
  86. #if !DISPATCH_USE_INTERNAL_WORKQUEUE
  87. #if DISPATCH_USE_PTHREAD_POOL
  88. if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
  89. #endif
  90. {
  91. if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
  92. _dispatch_root_queue_debug("worker thread request still pending "
  93. "for global queue: %p", dq);
  94. return;
  95. }
  96. }
  97. #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
  98. return _dispatch_root_queue_poke_slow(dq, n, floor);
  99. }
  100. // 继续查看 _dispatch_root_queue_poke_slow
  101. static void
  102. _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
  103. {
  104. int remaining = n;
  105. int r = ENOSYS;
  106. _dispatch_root_queues_init();
  107. _dispatch_debug_root_queue(dq, __func__);
  108. _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
  109. #if !DISPATCH_USE_INTERNAL_WORKQUEUE
  110. #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
  111. if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
  112. #endif
  113. {
  114. _dispatch_root_queue_debug("requesting new worker thread for global "
  115. "queue: %p", dq);
  116. r = _pthread_workqueue_addthreads(remaining,
  117. _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
  118. (void)dispatch_assume_zero(r);
  119. return;
  120. }
  121. #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
  122. #if DISPATCH_USE_PTHREAD_POOL
  123. dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
  124. if (likely(pqc->dpq_thread_mediator.do_vtable)) {
  125. while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
  126. _dispatch_root_queue_debug("signaled sleeping worker for "
  127. "global queue: %p", dq);
  128. if (!--remaining) {
  129. return;
  130. }
  131. }
  132. }
  133. bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
  134. if (overcommit) {
  135. os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
  136. } else {
  137. if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
  138. _dispatch_root_queue_debug("worker thread request still pending for "
  139. "global queue: %p", dq);
  140. return;
  141. }
  142. }
  143. int can_request, t_count;
  144. // seq_cst with atomic store to tail <rdar://problem/16932833>
  145. t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
  146. do {
  147. can_request = t_count < floor ? 0 : t_count - floor;
  148. if (remaining > can_request) {
  149. _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
  150. remaining, can_request);
  151. os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
  152. remaining = can_request;
  153. }
  154. if (remaining == 0) {
  155. _dispatch_root_queue_debug("pthread pool is full for root queue: "
  156. "%p", dq);
  157. return;
  158. }
  159. } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
  160. t_count - remaining, &t_count, acquire));
  161. #if !defined(_WIN32)
  162. pthread_attr_t *attr = &pqc->dpq_thread_attr;
  163. pthread_t tid, *pthr = &tid;
  164. #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
  165. if (unlikely(dq == &_dispatch_mgr_root_queue)) {
  166. pthr = _dispatch_mgr_root_queue_init();
  167. }
  168. #endif
  169. do {
  170. _dispatch_retain(dq); // released in _dispatch_worker_thread
  171. while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
  172. if (r != EAGAIN) {
  173. (void)dispatch_assume_zero(r);
  174. }
  175. _dispatch_temporary_resource_shortage();
  176. }
  177. } while (--remaining);
  178. #else // defined(_WIN32)
  179. #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
  180. if (unlikely(dq == &_dispatch_mgr_root_queue)) {
  181. _dispatch_mgr_root_queue_init();
  182. }
  183. #endif
  184. do {
  185. _dispatch_retain(dq); // released in _dispatch_worker_thread
  186. #if DISPATCH_DEBUG
  187. unsigned dwStackSize = 0;
  188. #else
  189. unsigned dwStackSize = 64 * 1024;
  190. #endif
  191. uintptr_t hThread = 0;
  192. while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
  193. if (errno != EAGAIN) {
  194. (void)dispatch_assume(hThread);
  195. }
  196. _dispatch_temporary_resource_shortage();
  197. }
  198. if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
  199. (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
  200. }
  201. CloseHandle((HANDLE)hThread);
  202. } while (--remaining);
  203. #endif // defined(_WIN32)
  204. #else
  205. (void)floor;
  206. #endif // DISPATCH_USE_PTHREAD_POOL
  207. }

到了这里可以清楚的看到对于全局队列使用 _pthread_workqueue_addthreads 开辟线程,对于其他队列使用 pthread_create 开辟新的线程。那么任务执行的代码为什么没看到?其实 _dispatch_root_queues_init 中会首先执行第一个任务:

  1. static inline void
  2. _dispatch_root_queues_init(void)
  3. {
  4. dispatch_once_f(&_dispatch_root_queues_pred, NULL,
  5. _dispatch_root_queues_init_once);
  6. }
  7. // 看一下 dispatch_once_f 就不展开了,可以看一下下面 dispatch_once 的分析,这里看一下 _dispatch_root_queues_init_once
  8. static void
  9. _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
  10. {
  11. _dispatch_fork_becomes_unsafe();
  12. #if DISPATCH_USE_INTERNAL_WORKQUEUE
  13. size_t i;
  14. for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
  15. _dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
  16. _dispatch_root_queues[i].dq_priority);
  17. }
  18. #else
  19. int wq_supported = _pthread_workqueue_supported();
  20. int r = ENOTSUP;
  21. if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
  22. DISPATCH_INTERNAL_CRASH(wq_supported,
  23. "QoS Maintenance support required");
  24. }
  25. #if DISPATCH_USE_KEVENT_SETUP
  26. struct pthread_workqueue_config cfg = {
  27. .version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
  28. .flags = 0,
  29. .workq_cb = 0,
  30. .kevent_cb = 0,
  31. .workloop_cb = 0,
  32. .queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
  33. #if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
  34. .queue_label_offs = dispatch_queue_offsets.dqo_label,
  35. #endif
  36. };
  37. #endif
  38. #pragma clang diagnostic push
  39. #pragma clang diagnostic ignored "-Wunreachable-code"
  40. if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
  41. #if DISPATCH_USE_KEVENT_SETUP
  42. cfg.workq_cb = _dispatch_worker_thread2;
  43. r = pthread_workqueue_setup(&cfg, sizeof(cfg));
  44. #else
  45. r = _pthread_workqueue_init(_dispatch_worker_thread2,
  46. offsetof(struct dispatch_queue_s, dq_serialnum), 0);
  47. #endif // DISPATCH_USE_KEVENT_SETUP
  48. #if DISPATCH_USE_KEVENT_WORKLOOP
  49. } else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
  50. #if DISPATCH_USE_KEVENT_SETUP
  51. cfg.workq_cb = _dispatch_worker_thread2;
  52. cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
  53. cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
  54. r = pthread_workqueue_setup(&cfg, sizeof(cfg));
  55. #else
  56. r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
  57. (pthread_workqueue_function_kevent_t)
  58. _dispatch_kevent_worker_thread,
  59. (pthread_workqueue_function_workloop_t)
  60. _dispatch_workloop_worker_thread,
  61. offsetof(struct dispatch_queue_s, dq_serialnum), 0);
  62. #endif // DISPATCH_USE_KEVENT_SETUP
  63. #endif // DISPATCH_USE_KEVENT_WORKLOOP
  64. #if DISPATCH_USE_KEVENT_WORKQUEUE
  65. } else if (wq_supported & WORKQ_FEATURE_KEVENT) {
  66. #if DISPATCH_USE_KEVENT_SETUP
  67. cfg.workq_cb = _dispatch_worker_thread2;
  68. cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
  69. r = pthread_workqueue_setup(&cfg, sizeof(cfg));
  70. #else
  71. r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
  72. (pthread_workqueue_function_kevent_t)
  73. _dispatch_kevent_worker_thread,
  74. offsetof(struct dispatch_queue_s, dq_serialnum), 0);
  75. #endif // DISPATCH_USE_KEVENT_SETUP
  76. #endif
  77. } else {
  78. DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
  79. }
  80. #pragma clang diagnostic pop
  81. if (r != 0) {
  82. DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
  83. "Root queue initialization failed");
  84. }
  85. #endif // DISPATCH_USE_INTERNAL_WORKQUEUE
  86. }
  87. // 继续查看 _dispatch_workloop_worker_thread
  88. static void
  89. _dispatch_workloop_worker_thread(uint64_t *workloop_id,
  90. dispatch_kevent_t *events, int *nevents)
  91. {
  92. if (!dispatch_assume(workloop_id && events && nevents)) {
  93. return;
  94. }
  95. if (!dispatch_assume(*workloop_id != 0)) {
  96. return _dispatch_kevent_worker_thread(events, nevents);
  97. }
  98. if (*nevents == 0 || *events == NULL) {
  99. // events for worker thread request have already been delivered earlier
  100. // or got cancelled before point of no return concurrently
  101. return;
  102. }
  103. dispatch_wlh_t wlh = (dispatch_wlh_t)*workloop_id;
  104. _dispatch_adopt_wlh(wlh);
  105. _dispatch_wlh_worker_thread(wlh, *events, nevents);
  106. _dispatch_preserve_wlh_storage_reference(wlh);
  107. }
  108. // 查看 _dispatch_worker_thread2
  109. static void
  110. _dispatch_worker_thread2(pthread_priority_t pp)
  111. {
  112. bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
  113. dispatch_queue_global_t dq;
  114. pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
  115. _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
  116. dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
  117. _dispatch_introspection_thread_add();
  118. _dispatch_trace_runtime_event(worker_unpark, dq, 0);
  119. int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
  120. dispatch_assert(pending >= 0);
  121. _dispatch_root_queue_drain(dq, dq->dq_priority,
  122. DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
  123. _dispatch_voucher_debug("root queue clear", NULL);
  124. _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
  125. _dispatch_trace_runtime_event(worker_park, NULL, 0);
  126. }
  127. // 查看 _dispatch_root_queue_drain
  128. static void
  129. _dispatch_root_queue_drain(dispatch_queue_global_t dq,
  130. dispatch_priority_t pri, dispatch_invoke_flags_t flags)
  131. {
  132. #if DISPATCH_DEBUG
  133. dispatch_queue_t cq;
  134. if (unlikely(cq = _dispatch_queue_get_current())) {
  135. DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
  136. }
  137. #endif
  138. _dispatch_queue_set_current(dq);
  139. _dispatch_init_basepri(pri);
  140. _dispatch_adopt_wlh_anon();
  141. struct dispatch_object_s *item;
  142. bool reset = false;
  143. dispatch_invoke_context_s dic = { };
  144. #if DISPATCH_COCOA_COMPAT
  145. _dispatch_last_resort_autorelease_pool_push(&dic);
  146. #endif // DISPATCH_COCOA_COMPAT
  147. _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
  148. _dispatch_perfmon_start();
  149. while (likely(item = _dispatch_root_queue_drain_one(dq))) {
  150. if (reset) _dispatch_wqthread_override_reset();
  151. _dispatch_continuation_pop_inline(item, &dic, flags, dq);
  152. reset = _dispatch_reset_basepri_override();
  153. if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
  154. break;
  155. }
  156. }
  157. // overcommit or not. worker thread
  158. if (pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
  159. _dispatch_perfmon_end(perfmon_thread_worker_oc);
  160. } else {
  161. _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
  162. }
  163. #if DISPATCH_COCOA_COMPAT
  164. _dispatch_last_resort_autorelease_pool_pop(&dic);
  165. #endif // DISPATCH_COCOA_COMPAT
  166. _dispatch_reset_wlh();
  167. _dispatch_clear_basepri();
  168. _dispatch_queue_set_current(NULL);
  169. }
  170. // 查看 _dispatch_continuation_pop_inline 这个是出队列操作,这里注意一下首先看了有没有 vtable(_dispatch_object_has_vtable),这里解释了为什么 dispatch_barrier_async 尽管主要流程和 dispatch_async 一模一样但是无法应用到全局队列的原因,因为全局队列没有 v_table 结构会直接像dispatch_async 一样执行
  171. static inline void
  172. _dispatch_continuation_pop_inline(dispatch_object_t dou,
  173. dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
  174. dispatch_queue_class_t dqu)
  175. {
  176. dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
  177. _dispatch_get_pthread_root_queue_observer_hooks();
  178. if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
  179. flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
  180. if (_dispatch_object_has_vtable(dou)) {
  181. dx_invoke(dou._dq, dic, flags);
  182. } else {
  183. _dispatch_continuation_invoke_inline(dou, flags, dqu);
  184. }
  185. if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
  186. }
  187. // 查看 _dispatch_continuation_invoke_inline,这里_dispatch_client_callout 就是真正的执行 block 操作 ,当然还有一种情况这里还不会走就是 _dispatch_continuation_with_group_invoke,这个后面的 dispatch_group 会用到
  188. static inline void
  189. _dispatch_continuation_invoke_inline(dispatch_object_t dou,
  190. dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
  191. {
  192. dispatch_continuation_t dc = dou._dc, dc1;
  193. dispatch_invoke_with_autoreleasepool(flags, {
  194. uintptr_t dc_flags = dc->dc_flags;
  195. // Add the item back to the cache before calling the function. This
  196. // allows the 'hot' continuation to be used for a quick callback.
  197. //
  198. // The ccache version is per-thread.
  199. // Therefore, the object has not been reused yet.
  200. // This generates better assembly.
  201. _dispatch_continuation_voucher_adopt(dc, dc_flags);
  202. if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
  203. _dispatch_trace_item_pop(dqu, dou);
  204. }
  205. if (dc_flags & DC_FLAG_CONSUME) {
  206. dc1 = _dispatch_continuation_free_cacheonly(dc);
  207. } else {
  208. dc1 = NULL;
  209. }
  210. if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
  211. _dispatch_continuation_with_group_invoke(dc);
  212. } else {
  213. _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
  214. _dispatch_trace_item_complete(dc);
  215. }
  216. if (unlikely(dc1)) {
  217. _dispatch_continuation_free_to_cache_limit(dc1);
  218. }
  219. });
  220. _dispatch_perfmon_workitem_inc();
  221. }

另外对于 _dispatch_continuation_init 的代码中的并没有对其进行展开,其实 _dispatch_continuation_init 中的 func 就是 _dispatch_call_block_and_release(源码如下),它在 dx_push 调用时包装进了 qos

  1. void
  2. _dispatch_call_block_and_release(void *block)
  3. {
  4. void (^b)(void) = block;
  5. b();
  6. Block_release(b);
  7. }

dispatch_async 代码实现看起来比较复杂,因为其中的数据结构较多,分支流程控制比较复杂。不过思路其实很简单,用链表保存所有提交的 block(先进先出,在队列本身维护了一个链表新加入 block 放到链表尾部),然后在底层线程池中,依次取出 block 并执行。

类似的可以看到 dispatch_barrier_async 源码和 dispatch_async 几乎一致,仅仅多了一个标记位 DC_FLAG_BARRIER,这个标记位用于在取出任务时进行判断,正常的异步调用会依次取出,而如果遇到了 DC_FLAG_BARRIER 则会返回,所以可以等待所有任务执行结束执行 dx_push(不过提醒一下dispatch_barrier_async 必须在自定义队列才有用,原因是 global 队列没有 v_table 结构,同时不要试图在主队列调用,否则会 crash):

  1. void
  2. dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
  3. {
  4. dispatch_continuation_t dc = _dispatch_continuation_alloc();
  5. uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
  6. dispatch_qos_t qos;
  7. qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
  8. _dispatch_continuation_async(dq, dc, qos, dc_flags);
  9. }

3、单次执行 dispatch_once

下面的代码在objc开发中应该很常见,这种方式可以保证instance只会创建一次:

  1. + (instancetype)sharedInstance {
  2. static MyClass *instance;
  3. static dispatch_once_t onceToken;
  4. dispatch_once(&onceToken, ^{
  5. instance = [[MyClass alloc] init];
  6. });
  7. return instance;
  8. }

我们不妨分析一下 dispatch_once 的源码:

  1. void
  2. dispatch_once(dispatch_once_t *val, dispatch_block_t block)
  3. {
  4. dispatch_once_f(val, block, _dispatch_Block_invoke(block));
  5. }
  6. // 展开 dispatch_once_f
  7. void
  8. dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
  9. {
  10. dispatch_once_gate_t l = (dispatch_once_gate_t)val;
  11. #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  12. uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
  13. if (likely(v == DLOCK_ONCE_DONE)) {
  14. return;
  15. }
  16. #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  17. if (likely(DISPATCH_ONCE_IS_GEN(v))) {
  18. return _dispatch_once_mark_done_if_quiesced(l, v);
  19. }
  20. #endif
  21. #endif
  22. if (_dispatch_once_gate_tryenter(l)) {
  23. return _dispatch_once_callout(l, ctxt, func);
  24. }
  25. return _dispatch_once_wait(l);
  26. }
  27. // 如果 os_atomic_load 为 DLOCK_ONCE_DONE 则直接返回,否则进入_dispatch_once_gate_tryenter,在这里首先判断对象是否存储过,如果存储过则则标记为 unlock
  28. static inline bool
  29. _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
  30. {
  31. return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
  32. (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
  33. }
  34. // 如果没有存储过则执行 _dispatch_once_callout,主要是执行 block
  35. static void
  36. _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
  37. dispatch_function_t func)
  38. {
  39. _dispatch_client_callout(ctxt, func);
  40. _dispatch_once_gate_broadcast(l);
  41. }
  42. // 执行过 block 则调用 _dispatch_once_gate_broadcast
  43. static inline void
  44. _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
  45. {
  46. dispatch_lock value_self = _dispatch_lock_value_for_self();
  47. uintptr_t v;
  48. #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  49. v = _dispatch_once_mark_quiescing(l);
  50. #else
  51. v = _dispatch_once_mark_done(l);
  52. #endif
  53. if (likely((dispatch_lock)v == value_self)) return;
  54. _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
  55. }
  56. // 在 _dispatch_once_gate_broadcast 中由于执行完毕,使用_dispatch_once_mark_done 标记为 done
  57. static inline uintptr_t
  58. _dispatch_once_mark_done(dispatch_once_gate_t dgo)
  59. {
  60. return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
  61. }

4、信号量 dispatch_semaphore

信号量是线程同步操作中很常用的一个操作,常用的几个类型:

  • dispatch_semaphore_t:信号量类型
  • dispatch_semaphore_create:创建一个信号量
  • dispatch_semaphore_wait:发送一个等待信号,信号量-1,当信号量为0阻塞线程,大于0则开始执行后面的逻辑(也就是说执行dispatch_semaphore_wait 前如果信号量 <=0 则阻塞,否则正常执行后面的逻辑)
  • dispatch_semaphore_signal:发送唤醒信号,信号量会+1

比如我们有个操作 run() 在异步线程已经开始执行,同时可能用户会手动再次触发动作 watch(),但是 watch 依赖 run 完成则可以使用信号量:

  1. - (void)run {
  2. dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
  3. dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
  4. // 这里执行其他任务。。。
  5. // TODO:
  6. // 执行完发送信号
  7. dispatch_semaphore_signal(semaphore);
  8. });
  9. self->semaphore = semaphore;
  10. }
  11. - (void)watch {
  12. // 等待上面的操作完成,如果60s还没有完成则超时继续执行下面的逻辑
  13. dispatch_semaphore_wait(self.semaphore, dispatch_time(DISPATCH_TIME_NOW, 60*NSEC_PER_SEC));
  14. // 这里执行其他任务。。。但是依赖上面的操作完成
  15. // TODO:
  16. }

那么信号量是如何实现的呢,不妨看一下它的源码:

  1. // 首先看一下 dispatch_semaphore_t,没错和上面一样本质就是 dispatch_semaphore_s,dsema_value 代表当前信号量,dsema_orig 表示初始信号量
  2. DISPATCH_CLASS_DECL(semaphore, OBJECT);
  3. struct dispatch_semaphore_s {
  4. DISPATCH_OBJECT_HEADER(semaphore);
  5. long volatile dsema_value;
  6. long dsema_orig;
  7. _dispatch_sema4_t dsema_sema;
  8. };
  9. // 查看 dispatch_semaphore_create 源码,其实并不复杂创建分配 DISPATCH_VTABLE 结构的空间,设置初始信号量,但是可以清楚的看到同样指定了目标队列,这是一个优先级为 DISPATCH_QUEUE_PRIORITY_DEFAULT 的非过载队列
  10. dispatch_semaphore_t
  11. dispatch_semaphore_create(long value)
  12. {
  13. dispatch_semaphore_t dsema;
  14. // If the internal value is negative, then the absolute of the value is
  15. // equal to the number of waiting threads. Therefore it is bogus to
  16. // initialize the semaphore with a negative value.
  17. if (value < 0) {
  18. return DISPATCH_BAD_INPUT;
  19. }
  20. dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
  21. sizeof(struct dispatch_semaphore_s));
  22. dsema->do_next = DISPATCH_OBJECT_LISTLESS;
  23. dsema->do_targetq = _dispatch_get_default_queue(false);
  24. dsema->dsema_value = value;
  25. _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
  26. dsema->dsema_orig = value;
  27. return dsema;
  28. }
  29. // 下面看一下 dispatch_semaphore_wait,首先 os_atomic_dec2o 信号量减一,当然递减之后信号量大于等于0它其实什么也不做继续执行就好了,但是如果不满足执行 _dispatch_semaphore_wait_slow 等待信号量唤醒或者 timeout 超时
  30. long
  31. dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
  32. {
  33. long value = os_atomic_dec2o(dsema, dsema_value, acquire);
  34. if (likely(value >= 0)) {
  35. return 0;
  36. }
  37. return _dispatch_semaphore_wait_slow(dsema, timeout);
  38. }
  39. // 看一下 _dispatch_semaphore_wait_slow 源码,这里首先对于两种极端情况:如果是 DISPATCH_TIME_NOW 则执行信号量 +1 并返回超时信号,DISPATCH_TIME_FOREVER 则一直等待,默认则调用 _dispatch_sema4_timedwait
  40. static long
  41. _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
  42. dispatch_time_t timeout)
  43. {
  44. long orig;
  45. _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
  46. switch (timeout) {
  47. default:
  48. if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
  49. break;
  50. }
  51. // Fall through and try to undo what the fast path did to
  52. // dsema->dsema_value
  53. case DISPATCH_TIME_NOW:
  54. orig = dsema->dsema_value;
  55. while (orig < 0) {
  56. if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
  57. &orig, relaxed)) {
  58. return _DSEMA4_TIMEOUT();
  59. }
  60. }
  61. // Another thread called semaphore_signal().
  62. // Fall through and drain the wakeup.
  63. case DISPATCH_TIME_FOREVER:
  64. _dispatch_sema4_wait(&dsema->dsema_sema);
  65. break;
  66. }
  67. return 0;
  68. }
  69. // 查看 _dispatch_sema4_timedwait 调用 mach 的内核函数semaphore_timedwait 等待收到信号直至超时
  70. bool
  71. _dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout)
  72. {
  73. mach_timespec_t _timeout;
  74. kern_return_t kr;
  75. do {
  76. uint64_t nsec = _dispatch_timeout(timeout);
  77. _timeout.tv_sec = (__typeof__(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
  78. _timeout.tv_nsec = (__typeof__(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
  79. kr = semaphore_timedwait(*sema, _timeout);
  80. } while (unlikely(kr == KERN_ABORTED));
  81. if (kr == KERN_OPERATION_TIMED_OUT) {
  82. return true;
  83. }
  84. DISPATCH_SEMAPHORE_VERIFY_KR(kr);
  85. return false;
  86. }
  87. // 最后看一下 dispatch_semaphore_signal,首先信号量 +1,如果信号量大于 0 就什么也不做(通常到了这里 dispatch_semaphore_wait 还没调用),否则执行 _dispatch_semaphore_signal_slow
  88. long
  89. dispatch_semaphore_signal(dispatch_semaphore_t dsema)
  90. {
  91. long value = os_atomic_inc2o(dsema, dsema_value, release);
  92. if (likely(value > 0)) {
  93. return 0;
  94. }
  95. if (unlikely(value == LONG_MIN)) {
  96. DISPATCH_CLIENT_CRASH(value,
  97. "Unbalanced call to dispatch_semaphore_signal()");
  98. }
  99. return _dispatch_semaphore_signal_slow(dsema);
  100. }
  101. // 查看 _dispatch_semaphore_signal_slow,调用内核 semaphore_signal 唤醒线程,如 Apple Api 描述 “如果唤醒线程则返回非0,否则返回0”。
  102. long
  103. _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
  104. {
  105. _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
  106. _dispatch_sema4_signal(&dsema->dsema_sema, 1);
  107. return 1;
  108. }
  109. // 查看 _dispatch_sema4_signal 源码
  110. void
  111. _dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
  112. {
  113. do {
  114. int ret = sem_post(sema);
  115. DISPATCH_SEMAPHORE_VERIFY_RET(ret);
  116. } while (--count);
  117. }

信号量是一个比较重要的内容,合理使用可以让你的程序更加的优雅,比如说一个常见的情况:大家知道 PHImageManager.requestImage 是一个释放消耗内存的方法,有时我们需要批量获取到图片执行一些操作的话可能就没办法直接for循环,不然内存会很快爆掉,因为每个 requestImage 操作都需要占用大量内存,即使外部嵌套 autoreleasepool 也不一定可以及时释放(想想 for 执行的速度,释放肯定来不及),那么 requestImage 又是一个异步操作,如此只能让一个操作执行完再执行另一个循环操作才能解决。也就是说这个问题就变成 for 循环内部的异步操作串行执行的问题。要解决这个问题有几种思路: 1.使用 requestImage 的同步请求照片 2.使用递归操作一个操作执行完再执行另外一个操作移除 for 操作 3.使用信号量解决。 当然第一个方法并非普适,有些异步操作并不能轻易改成同步操作,第二个方法相对普适,但是递归调用本身因为要改变原来的代码结构看起来不是那么优雅,自然当前讨论的信号量是更好的方式。我们假设 requestImage 是一个 watch(callback:((_ image)-> Void)) 操作,整个请求是一个 run(callback:((_ images)->Void)) 那么它的实现方式如下:

  1. - (void)run:(CallbackWithImages)callback {
  2. dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
  3. dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
  4. dispatch_async(globalQueue, ^{
  5. NSMutableArray *array = [[NSMutableArray alloc] init];
  6. for (int i=0; i<100; ++i) {
  7. [self watch:^(UIImage *image){
  8. [array addObject:image];
  9. dispatch_semaphore_signal(semaphore);
  10. }];
  11. dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
  12. }
  13. dispatch_async(dispatch_get_main_queue(), ^{
  14. callback([array copy]);
  15. });
  16. });
  17. }
  18. - (void)watch:(CallbackWithImage)callback {
  19. dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
  20. dispatch_async(globalQueue, ^{
  21. callback([UIImage new]);
  22. });
  23. }

5、调度组 dispatch_group

dispatch_group 常常用来同步多个任务(注意和 dispatch_barrier_sync 不同的是它可以是多个队列的同步),所以其实上面先分析 dispatch_semaphore 也是这个原因,它本身是依靠信号量来完成的同步管理。典型的用法如下:

  1. dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
  2. dispatch_group_t group = dispatch_group_create();
  3. dispatch_group_async(group, globalQueue, ^{
  4. sleep(10);
  5. NSLog(@"任务1完成");
  6. });
  7. dispatch_group_async(group, globalQueue, ^{
  8. NSLog(@"任务2完成");
  9. });
  10. dispatch_group_notify(group, globalQueue, ^{
  11. NSLog(@"两个任务全部完成");
  12. });
  13. dispatch_async(globalQueue, ^{
  14. // 等待5s超时后继续执行,此时dispatch_group中的任务未必全部完成,注意:dispatch_group_wait是同步操作必须放到异步队列否则阻塞当前线程
  15. dispatch_group_wait(group, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC));
  16. NSLog(@"等待到了上限,开始执行。。。");
  17. });

下面看一下 dispatch_group 相关的源码:

  1. // 和其他对象一样,dispatch_group_t 的本质就是 dispatch_group_s 指针,这里重点关注一下 dg_state 和 dg_bits 是一个计数器
  2. struct dispatch_group_s {
  3. DISPATCH_OBJECT_HEADER(group);
  4. DISPATCH_UNION_LE(uint64_t volatile dg_state,
  5. uint32_t dg_bits,
  6. uint32_t dg_gen
  7. ) DISPATCH_ATOMIC64_ALIGN;
  8. struct dispatch_continuation_s *volatile dg_notify_head;
  9. struct dispatch_continuation_s *volatile dg_notify_tail;
  10. };
  11. // 查看 dispatch_group_create
  12. dispatch_group_t
  13. dispatch_group_create(void)
  14. {
  15. return _dispatch_group_create_with_count(0);
  16. }
  17. // 展开 _dispatch_group_create_with_count,其实就是一个dispatch_group_s 对象,指定了 do_targetq 是默认队列并且不支持过载
  18. static inline dispatch_group_t
  19. _dispatch_group_create_with_count(uint32_t n)
  20. {
  21. dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
  22. sizeof(struct dispatch_group_s));
  23. dg->do_next = DISPATCH_OBJECT_LISTLESS;
  24. dg->do_targetq = _dispatch_get_default_queue(false);
  25. if (n) {
  26. os_atomic_store2o(dg, dg_bits,
  27. (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
  28. os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
  29. }
  30. return dg;
  31. }
  32. // 首先看一下 dispatch_group_enter,它的核心就是 os_atomic_sub_orig2o 对 dg_bits 进行 -1 操作
  33. void
  34. dispatch_group_enter(dispatch_group_t dg)
  35. {
  36. // The value is decremented on a 32bits wide atomic so that the carry
  37. // for the 0 -> -1 transition is not propagated to the upper 32bits.
  38. uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
  39. DISPATCH_GROUP_VALUE_INTERVAL, acquire);
  40. uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
  41. if (unlikely(old_value == 0)) {
  42. _dispatch_retain(dg); // <rdar://problem/22318411>
  43. }
  44. if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
  45. DISPATCH_CLIENT_CRASH(old_bits,
  46. "Too many nested calls to dispatch_group_enter()");
  47. }
  48. }
  49. // 然后看一下 dispatch_group_leave,核心就是 os_atomic_add_orig2o 执行 dg_state+1 操作,如果 +1 之后还等于 0 那么说明之前没有调用dispatch_group_enter,就里会 crash,当然这里核心在 _dispatch_group_wake
  50. void
  51. dispatch_group_leave(dispatch_group_t dg)
  52. {
  53. // The value is incremented on a 64bits wide atomic so that the carry for
  54. // the -1 -> 0 transition increments the generation atomically.
  55. uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
  56. DISPATCH_GROUP_VALUE_INTERVAL, release);
  57. uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
  58. if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
  59. old_state += DISPATCH_GROUP_VALUE_INTERVAL;
  60. do {
  61. new_state = old_state;
  62. if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
  63. new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
  64. new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
  65. } else {
  66. // If the group was entered again since the atomic_add above,
  67. // we can't clear the waiters bit anymore as we don't know for
  68. // which generation the waiters are for
  69. new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
  70. }
  71. if (old_state == new_state) break;
  72. } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
  73. old_state, new_state, &old_state, relaxed)));
  74. return _dispatch_group_wake(dg, old_state, true);
  75. }
  76. if (unlikely(old_value == 0)) {
  77. DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
  78. "Unbalanced call to dispatch_group_leave()");
  79. }
  80. }
  81. // 查看 _dispatch_group_wake 源码,到了这里通常就是出了调度组,如果有 notify 等待则执行 notify 遍历并且在对应队列中执行,如果有 wait 任务则唤醒其执行任务(注意这里比较牛叉的 _dispatch_wake_by_address 可以根据地址进行函数调用,本身是调用的 WakeByAddressAll 这个系统调用)
  82. static void
  83. _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
  84. {
  85. uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
  86. if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
  87. dispatch_continuation_t dc, next_dc, tail;
  88. // Snapshot before anything is notified/woken <rdar://problem/8554546>
  89. dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
  90. do {
  91. dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
  92. next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
  93. _dispatch_continuation_async(dsn_queue, dc,
  94. _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
  95. _dispatch_release(dsn_queue);
  96. } while ((dc = next_dc));
  97. refs++;
  98. }
  99. if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
  100. _dispatch_wake_by_address(&dg->dg_gen);
  101. }
  102. if (refs) _dispatch_release_n(dg, refs);
  103. }
  104. // 查看 dispatch_group_async 源码,dispatch_continuation_t 仅仅是封装任务,核心是 _dispatch_continuation_group_async
  105. void
  106. dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
  107. dispatch_block_t db)
  108. {
  109. dispatch_continuation_t dc = _dispatch_continuation_alloc();
  110. uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
  111. dispatch_qos_t qos;
  112. qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
  113. _dispatch_continuation_group_async(dg, dq, dc, qos);
  114. }
  115. // 展开 _dispatch_continuation_group_async,这里重点记住 dispatch_group_enter() 方法,至于 _dispatch_continuation_async 前面已经介绍过
  116. static inline void
  117. _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
  118. dispatch_continuation_t dc, dispatch_qos_t qos)
  119. {
  120. dispatch_group_enter(dg);
  121. dc->dc_data = dg;
  122. _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
  123. }
  124. // 继续查看 dispatch_group_notify,注意这里将 dispatch_block_t 存储到了共享数据 dispatch_continuation_t 中
  125. void
  126. dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
  127. dispatch_block_t db)
  128. {
  129. dispatch_continuation_t dsn = _dispatch_continuation_alloc();
  130. _dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
  131. _dispatch_group_notify(dg, dq, dsn);
  132. }
  133. // 展开 _dispatch_group_notify,其实最主要的方法就是 _dispatch_group_wake
  134. static inline void
  135. _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
  136. dispatch_continuation_t dsn)
  137. {
  138. uint64_t old_state, new_state;
  139. dispatch_continuation_t prev;
  140. dsn->dc_data = dq;
  141. _dispatch_retain(dq);
  142. prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
  143. if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
  144. os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
  145. if (os_mpsc_push_was_empty(prev)) {
  146. os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
  147. new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
  148. if ((uint32_t)old_state == 0) {
  149. os_atomic_rmw_loop_give_up({
  150. return _dispatch_group_wake(dg, new_state, false);
  151. });
  152. }
  153. });
  154. }
  155. }

简单的说就是 dispatch_group_asyncdispatch_group_notify 本身就是和 dispatch_group_enterdispatch_group_leave 没有本质区别,后者相对更加灵活。当然这里还有一个重要的操作就是dispatch_group_wait,还没有看:

  1. // os_atomic_rmw_loop2o 不断遍历,如果 (old_state & DISPATCH_GROUP_VALUE_MASK) == 0 表示执行完,直接返回 0,如果当前如果超时立即返回,其他情况调用 _dispatch_group_wait_slow
  2. long
  3. dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
  4. {
  5. uint64_t old_state, new_state;
  6. os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
  7. if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
  8. os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
  9. }
  10. if (unlikely(timeout == 0)) {
  11. os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
  12. }
  13. new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
  14. if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
  15. os_atomic_rmw_loop_give_up(break);
  16. }
  17. });
  18. return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
  19. }
  20. // 查看 _dispatch_group_wait_slow,最终调用_dispatch_wait_on_address 直至 __ulock_wait
  21. static long
  22. _dispatch_group_wait_slow(dispatch_group_t dg, uint32_t gen,
  23. dispatch_time_t timeout)
  24. {
  25. for (;;) {
  26. int rc = _dispatch_wait_on_address(&dg->dg_gen, gen, timeout, 0);
  27. if (likely(gen != os_atomic_load2o(dg, dg_gen, acquire))) {
  28. return 0;
  29. }
  30. if (rc == ETIMEDOUT) {
  31. return _DSEMA4_TIMEOUT();
  32. }
  33. }
  34. }

6、延迟执行 dispatch_after

dispatch_after 也是一个常用的延迟执行的方法,比如常见的使用方法是:

  1. dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
  2. NSLog(@"...");
  3. });

在查看 dispatch_after 源码之前先看一下另一个内容事件源 dispatch_source_t,其实 dispatch_source_t 是一个很少让开发者和 GCD 联想到一起的一个类型,它本身也有对应的创建方法 dispatch_source_create(事实上它的使用甚至可以追踪到 Runloop)。多数开发者认识 dispatch_source_t 都是通过定时器,很多文章会教你如何创建一个比较准确的定时器,比如下面的代码:

  1. dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0));
  2. dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 0), 3*NSEC_PER_SEC, 0);
  3. dispatch_source_set_event_handler(timerSource, ^{
  4. NSLog(@"dispatch_source_t...");
  5. });
  6. dispatch_resume(timerSource);
  7. self.source = timerSource;

如果你知道上面一个定时器如何执行的那么下面看一下 dispatch_after 应该就比较容易明白了:

  1. void
  2. dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
  3. dispatch_block_t work)
  4. {
  5. _dispatch_after(when, queue, NULL, work, true);
  6. }
  7. // 查看 _dispatch_after
  8. static inline void
  9. _dispatch_after(dispatch_time_t when, dispatch_queue_t dq,
  10. void *ctxt, void *handler, bool block)
  11. {
  12. dispatch_timer_source_refs_t dt;
  13. dispatch_source_t ds;
  14. uint64_t leeway, delta;
  15. if (when == DISPATCH_TIME_FOREVER) {
  16. #if DISPATCH_DEBUG
  17. DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
  18. #endif
  19. return;
  20. }
  21. delta = _dispatch_timeout(when);
  22. if (delta == 0) {
  23. if (block) {
  24. return dispatch_async(dq, handler);
  25. }
  26. return dispatch_async_f(dq, ctxt, handler);
  27. }
  28. leeway = delta / 10; // <rdar://problem/13447496>
  29. if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
  30. if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
  31. // this function can and should be optimized to not use a dispatch source
  32. ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, dq);
  33. dt = ds->ds_timer_refs;
  34. dispatch_continuation_t dc = _dispatch_continuation_alloc();
  35. if (block) {
  36. _dispatch_continuation_init(dc, dq, handler, 0, 0);
  37. } else {
  38. _dispatch_continuation_init_f(dc, dq, ctxt, handler, 0, 0);
  39. }
  40. // reference `ds` so that it doesn't show up as a leak
  41. dc->dc_data = ds;
  42. _dispatch_trace_item_push(dq, dc);
  43. os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
  44. dispatch_clock_t clock;
  45. uint64_t target;
  46. _dispatch_time_to_clock_and_value(when, &clock, &target);
  47. if (clock != DISPATCH_CLOCK_WALL) {
  48. leeway = _dispatch_time_nano2mach(leeway);
  49. }
  50. dt->du_timer_flags |= _dispatch_timer_flags_from_clock(clock);
  51. dt->dt_timer.target = target;
  52. dt->dt_timer.interval = UINT64_MAX;
  53. dt->dt_timer.deadline = target + leeway;
  54. dispatch_activate(ds);
  55. }

代码并不是太复杂,无时间差则直接调用 dispatch_async,否则先创建一个dispatch_source_t,不同的是这里的类型并不是 DISPATCH_SOURCE_TYPE_TIMER 而是 _dispatch_source_type_after,查看源码不难发现它只是 dispatch_source_type_s 类型的一个常量和 _dispatch_source_type_timer 并没有明显区别:

  1. const dispatch_source_type_s _dispatch_source_type_after = {
  2. .dst_kind = "timer (after)",
  3. .dst_filter = DISPATCH_EVFILT_TIMER_WITH_CLOCK,
  4. .dst_flags = EV_DISPATCH,
  5. .dst_mask = 0,
  6. .dst_timer_flags = DISPATCH_TIMER_AFTER,
  7. .dst_action = DISPATCH_UNOTE_ACTION_SOURCE_TIMER,
  8. .dst_size = sizeof(struct dispatch_timer_source_refs_s),
  9. .dst_create = _dispatch_source_timer_create,
  10. .dst_merge_evt = _dispatch_source_merge_evt,
  11. };

dispatch_activate() 其实和 dispatch_resume() 是一样的开启定时器。那么为什么看不到 dispatch_source_set_event_handler 来给 timer 设置 handler 呢?不妨看一下 dispatch_source_set_event_handler 的源代码:

  1. void
  2. dispatch_source_set_event_handler(dispatch_source_t ds,
  3. dispatch_block_t handler)
  4. {
  5. _dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true);
  6. }
  7. // 查看 _dispatch_source_set_handler
  8. static void
  9. _dispatch_source_set_handler(dispatch_source_t ds, void *func,
  10. uintptr_t kind, bool is_block)
  11. {
  12. dispatch_continuation_t dc;
  13. dc = _dispatch_source_handler_alloc(ds, func, kind, is_block);
  14. if (_dispatch_lane_try_inactive_suspend(ds)) {
  15. _dispatch_source_handler_replace(ds, kind, dc);
  16. return _dispatch_lane_resume(ds, DISPATCH_RESUME);
  17. }
  18. dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds);
  19. if (unlikely(dqf & DSF_STRICT)) {
  20. DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source "
  21. "after it has been activated");
  22. }
  23. // Ignore handlers mutations past cancelation, it's harmless
  24. if ((dqf & DSF_CANCELED) == 0) {
  25. _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
  26. if (kind == DS_REGISTN_HANDLER) {
  27. _dispatch_bug_deprecated("Setting registration handler after "
  28. "the source has been activated");
  29. } else if (func == NULL) {
  30. _dispatch_bug_deprecated("Clearing handler after "
  31. "the source has been activated");
  32. }
  33. }
  34. dc->dc_data = (void *)kind;
  35. _dispatch_barrier_trysync_or_async_f(ds, dc,
  36. _dispatch_source_set_handler_slow, 0);
  37. }

可以看到最终还是封装成一个 dispatch_continuation_t 进行同步或者异步调用,而上面 _dispatch_after 直接构建了 dispatch_continuation_t 进行执行。

取消延迟执行的任务

使用 dispatch_after 还有一个问题就是取消问题,当然通常遇到了这种问题大部分答案就是使用下面的方式:

  1. [self performSelector:@selector(myDelayedMethod) withObject: self afterDelay: desiredDelay];
  2. [NSObject cancelPreviousPerformRequestsWithTarget: self selector:@selector(myDelayedMethod) object: self];

不过如果你使用的是 iOS 8 及其以上的版本,那么其实是可以取消的(如下),当然如果你还在支持 iOS 8以下 的版本不妨试试这个自定义的dispatch_cancelable_block_t类:

  1. dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{
  2. NSLog(@"dispatch_after...");
  3. });
  4. dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 3*NSEC_PER_SEC), dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), block);
  5. // 取消
  6. dispatch_block_cancel(block);

7、dispatch_apply

dispatch_apply 设计的主要目的是提高并行能力,所以一般我们用来并行执行多个结构类似的任务,比如:

  1. void
  2. dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
  3. {
  4. dispatch_apply_f(iterations, dq, work,
  5. (dispatch_apply_function_t)_dispatch_Block_invoke(work));
  6. }
  7. // 查看 dispatch_apply_f,对于 width=1 或者单核心数cpu其实这个是一个同步调用,核心方法就是 _dispatch_apply_f
  8. void
  9. dispatch_apply_f(size_t iterations, dispatch_queue_t _dq, void *ctxt,
  10. void (*func)(void *, size_t))
  11. {
  12. if (unlikely(iterations == 0)) {
  13. return;
  14. }
  15. dispatch_thread_context_t dtctxt =
  16. _dispatch_thread_context_find(_dispatch_apply_key);
  17. size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0;
  18. dispatch_queue_t old_dq = _dispatch_queue_get_current();
  19. dispatch_queue_t dq;
  20. if (likely(_dq == DISPATCH_APPLY_AUTO)) {
  21. dq = _dispatch_apply_root_queue(old_dq)->_as_dq;
  22. } else {
  23. dq = _dq; // silence clang Nullability complaints
  24. }
  25. dispatch_qos_t qos = _dispatch_priority_qos(dq->dq_priority) ?:
  26. _dispatch_priority_fallback_qos(dq->dq_priority);
  27. if (unlikely(dq->do_targetq)) {
  28. // if the queue passed-in is not a root queue, use the current QoS
  29. // since the caller participates in the work anyway
  30. qos = _dispatch_qos_from_pp(_dispatch_get_priority());
  31. }
  32. int32_t thr_cnt = (int32_t)_dispatch_qos_max_parallelism(qos,
  33. DISPATCH_MAX_PARALLELISM_ACTIVE);
  34. if (likely(!nested)) {
  35. nested = iterations;
  36. } else {
  37. thr_cnt = nested < (size_t)thr_cnt ? thr_cnt / (int32_t)nested : 1;
  38. nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX
  39. ? nested * iterations : DISPATCH_APPLY_MAX;
  40. }
  41. if (iterations < (size_t)thr_cnt) {
  42. thr_cnt = (int32_t)iterations;
  43. }
  44. struct dispatch_continuation_s dc = {
  45. .dc_func = (void*)func,
  46. .dc_ctxt = ctxt,
  47. .dc_data = dq,
  48. };
  49. dispatch_apply_t da = (__typeof__(da))_dispatch_continuation_alloc();
  50. da->da_index = 0;
  51. da->da_todo = iterations;
  52. da->da_iterations = iterations;
  53. da->da_nested = nested;
  54. da->da_thr_cnt = thr_cnt;
  55. #if DISPATCH_INTROSPECTION
  56. da->da_dc = _dispatch_continuation_alloc();
  57. *da->da_dc = dc;
  58. da->da_dc->dc_flags = DC_FLAG_ALLOCATED;
  59. #else
  60. da->da_dc = &dc;
  61. #endif
  62. da->da_flags = 0;
  63. if (unlikely(dq->dq_width == 1 || thr_cnt <= 1)) {
  64. return dispatch_sync_f(dq, da, _dispatch_apply_serial);
  65. }
  66. if (unlikely(dq->do_targetq)) {
  67. if (unlikely(dq == old_dq)) {
  68. return dispatch_sync_f(dq, da, _dispatch_apply_serial);
  69. } else {
  70. return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
  71. }
  72. }
  73. dispatch_thread_frame_s dtf;
  74. _dispatch_thread_frame_push(&dtf, dq);
  75. _dispatch_apply_f(upcast(dq)._dgq, da, _dispatch_apply_invoke);
  76. _dispatch_thread_frame_pop(&dtf);
  77. }
  78. // 查看 _dispatch_apply_f
  79. static inline void
  80. _dispatch_apply_f(dispatch_queue_global_t dq, dispatch_apply_t da,
  81. dispatch_function_t func)
  82. {
  83. int32_t i = 0;
  84. dispatch_continuation_t head = NULL, tail = NULL;
  85. pthread_priority_t pp = _dispatch_get_priority();
  86. // The current thread does not need a continuation
  87. int32_t continuation_cnt = da->da_thr_cnt - 1;
  88. dispatch_assert(continuation_cnt);
  89. for (i = 0; i < continuation_cnt; i++) {
  90. dispatch_continuation_t next = _dispatch_continuation_alloc();
  91. uintptr_t dc_flags = DC_FLAG_CONSUME;
  92. _dispatch_continuation_init_f(next, dq, da, func,
  93. DISPATCH_BLOCK_HAS_PRIORITY, dc_flags);
  94. next->dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
  95. next->do_next = head;
  96. head = next;
  97. if (!tail) {
  98. tail = next;
  99. }
  100. }
  101. _dispatch_thread_event_init(&da->da_event);
  102. // FIXME: dq may not be the right queue for the priority of `head`
  103. _dispatch_trace_item_push_list(dq, head, tail);
  104. _dispatch_root_queue_push_inline(dq, head, tail, continuation_cnt);
  105. // Call the first element directly
  106. _dispatch_apply_invoke_and_wait(da);
  107. }
  108. // 展开 _dispatch_apply_invoke_and_wait
  109. static void
  110. _dispatch_apply_invoke_and_wait(void *ctxt)
  111. {
  112. _dispatch_apply_invoke2(ctxt, DISPATCH_APPLY_INVOKE_WAIT);
  113. _dispatch_perfmon_workitem_inc();
  114. }
  115. // 查看 _dispatch_apply_invoke2,主要是循环使用 _dispatch_perfmon_workitem_inc 调用任务,同时在最后一个任务调用完恢复线程 _dispatch_thread_event_signal
  116. static inline void
  117. _dispatch_apply_invoke2(dispatch_apply_t da, long invoke_flags)
  118. {
  119. size_t const iter = da->da_iterations;
  120. size_t idx, done = 0;
  121. idx = os_atomic_inc_orig2o(da, da_index, acquire);
  122. if (unlikely(idx >= iter)) goto out;
  123. // da_dc is only safe to access once the 'index lock' has been acquired
  124. dispatch_apply_function_t const func = (void *)da->da_dc->dc_func;
  125. void *const da_ctxt = da->da_dc->dc_ctxt;
  126. _dispatch_perfmon_workitem_dec(); // this unit executes many items
  127. // Handle nested dispatch_apply rdar://problem/9294578
  128. dispatch_thread_context_s apply_ctxt = {
  129. .dtc_key = _dispatch_apply_key,
  130. .dtc_apply_nesting = da->da_nested,
  131. };
  132. _dispatch_thread_context_push(&apply_ctxt);
  133. dispatch_thread_frame_s dtf;
  134. dispatch_priority_t old_dbp = 0;
  135. if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
  136. dispatch_queue_t dq = da->da_dc->dc_data;
  137. _dispatch_thread_frame_push(&dtf, dq);
  138. old_dbp = _dispatch_set_basepri(dq->dq_priority);
  139. }
  140. dispatch_invoke_flags_t flags = da->da_flags;
  141. // Striding is the responsibility of the caller.
  142. do {
  143. dispatch_invoke_with_autoreleasepool(flags, {
  144. _dispatch_client_callout2(da_ctxt, idx, func);
  145. _dispatch_perfmon_workitem_inc();
  146. done++;
  147. idx = os_atomic_inc_orig2o(da, da_index, relaxed);
  148. });
  149. } while (likely(idx < iter));
  150. if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
  151. _dispatch_reset_basepri(old_dbp);
  152. _dispatch_thread_frame_pop(&dtf);
  153. }
  154. _dispatch_thread_context_pop(&apply_ctxt);
  155. // The thread that finished the last workitem wakes up the possibly waiting
  156. // thread that called dispatch_apply. They could be one and the same.
  157. if (!os_atomic_sub2o(da, da_todo, done, release)) {
  158. _dispatch_thread_event_signal(&da->da_event);
  159. }
  160. out:
  161. if (invoke_flags & DISPATCH_APPLY_INVOKE_WAIT) {
  162. _dispatch_thread_event_wait(&da->da_event);
  163. _dispatch_thread_event_destroy(&da->da_event);
  164. }
  165. if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) {
  166. #if DISPATCH_INTROSPECTION
  167. _dispatch_continuation_free(da->da_dc);
  168. #endif
  169. _dispatch_continuation_free((dispatch_continuation_t)da);
  170. }
  171. }

8、补充

8.1 线程和锁

在GCD中其实总共有两个线程池进行线程管理,一个是主线程池,另一个是除了主线程池之外的线程池。主线程池由序列为1的主队列管理,使用 objc.io 上的一幅图表示如下:

image.png

大家都知道使用 dispatch_sync 很有可能会发生死锁那么这是为什么呢?

不妨回顾一下 dispatch_sync 的过程:

  1. dispatch_sync
  2. _dispatch_sync_f:区分并发还是串行队列,如果是串行队列
  3. _dispatch_barrier_sync_f:是不是同一个队列,如果是
  4. _dispatch_sync_f_slow

重点在 _dq_state_drain_locked_by(dq_state, dsc->dsc_waiter) 这个条件,成立则会发生死锁,那么它成立的条件就是 ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0 首先 lock_valuetid 进行异或操作,相同为 0 不同为 1,然后和 DLOCK_OWNER_MASK(0xfffffffc) 进行按位与操作,一个为0则是0,所以如果 lock_value 和 tid 相同则会发生死锁。

8.2 likely 和 unlikely

  1. #define likely(x) __builtin_expect(!!(x), 1)
  2. #define unlikely(x) __builtin_expect(!!(x), 0)

看了 likely 和 unlikely 可以了解,likely 表示更大可能成立,unlikely 表示更大可能不成立。likely 就是 if(likely(x == 0)) 就是 if (x==0)。

8.3 __builtin_expect

__builtin_expect 是一个针对编译器优化的内置函数,让编译更加优化。比如说我们会写这种代码:

  1. if a {
  2. print(a)
  3. } else {
  4. print(b)
  5. }

如果我们更加倾向于使用 a 那么可将其设为默认值,极特殊情况下才会使用 b 条件。CPU读取指定是多条一起加载的,可能先加载进来的是 a,那么如果遇到执行 b 的情况则再加载 b,那么对于条件 a 的情况就造成了性能浪费。long builtin_expect (long EXP, long C) 第一个参数是要预测变量,第二个参数是预测值,这样 builtin_expect(a,false) 说明多数情况 a 应该是false,极少数情况可能是 true,这样不至于造成性能浪费。其实对于编译器在汇编时会优化成 if !a 的形式:

  1. if !a {
  2. print(b)
  3. } else {
  4. print(a)
  5. }

8.4 os_atomic_cmpxchg

第二个参数与第一个参数值比较,如果相等,第三个参数的值替换第一个参数的值。如果不相等,把第一个参数的值赋值到第二个参数上。

  1. #define os_atomic_cmpxchg(p, e, v, m) \
  2. ({ _os_atomic_basetypeof(p) _r = (e); \
  3. atomic_compare_exchange_strong_explicit(_os_atomic_c11_atomic(p), \
  4. &_r, v, memory_order_##m, memory_order_relaxed); })

8.5 os_atomic_store2o

将第二个参数保存到第一个参数中

  1. #define os_atomic_store2o(p, f, v, m) \
  2. os_atomic_store(&(p)->f, (v), m)
  3. #define os_atomic_store(p, v, m) \
  4. atomic_store_explicit(_os_atomic_c11_atomic(p), v, memory_order_##m)

8.6 os_atomic_inc_orig

第一个参数赋值为1

  1. #define os_atomic_inc_orig(p, m) \
  2. os_atomic_add_orig((p), 1, m)
  3. #define os_atomic_add_orig(p, v, m) \
  4. _os_atomic_c11_op_orig((p), (v), m, add, +)
  5. #define _os_atomic_c11_op_orig(p, v, m, o, op) \
  6. atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), v, \
  7. memory_order_##m)

8.7 os_atomic_inc_orig2o

第二个参数加1并返回

  1. #define os_atomic_inc_orig2o(p, f, m) \
  2. os_atomic_add_orig2o(p, f, 1, m)
  3. #define os_atomic_add_orig2o(p, f, v, m) \
  4. os_atomic_add_orig(&(p)->f, (v), m)

8.8 os_atomic_dec2o

第二个参数-1并返回

  1. #define os_atomic_dec2o(p, f, m) \
  2. os_atomic_sub2o(p, f, 1, m)
  3. #define os_atomic_sub2o(p, f, v, m) \
  4. os_atomic_sub(&(p)->f, (v), m)