1. 主队列分析

主队列是GCD提供的特殊的串行队列,在libdispatch源码中,找到主队列的定义,找到它是串行队列的依据

1.1 找到主队列的类型实现

1.1 方式一

搜索dispatch_get_main_queue关键字

  1. DISPATCH_INLINE DISPATCH_ALWAYS_INLINE DISPATCH_CONST DISPATCH_NOTHROW
  2. dispatch_queue_main_t
  3. dispatch_get_main_queue(void)
  4. {
  5. return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
  6. }

参数2为队列类型,在源码中搜索_dispatch_main_q,找到的结果非常多,这时可以尝试搜索_dispatch_main_q =,如果是赋值操作,可以缩小结果的范围

搜索_dispatch_main_q =,找到实现代码

  1. struct dispatch_queue_static_s _dispatch_main_q = {
  2. DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
  3. #if !DISPATCH_USE_RESOLVERS
  4. .do_targetq = _dispatch_get_default_queue(true),
  5. #endif
  6. .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
  7. DISPATCH_QUEUE_ROLE_BASE_ANON,
  8. .dq_label = "com.apple.main-thread",
  9. .dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
  10. .dq_serialnum = 1,
  11. };

1.2 方式二

在项目中,创建主队列,使用lldb,可以输出它的结构,其中包含队列标识符

  1. dispatch_queue_t queue = dispatch_get_main_queue();
  2. -------------------------
  3. (lldb) po queue
  4. <OS_dispatch_queue_main: com.apple.main-thread[0x104a6cc80] = { xref = -2147483648, ref = -2147483648, sref = 1, target = com.apple.root.default-qos.overcommit[0x104a6d100], width = 0x1, state = 0x001ffe9000000300, dirty, in-flight = 0, thread = 0x303 }>

来到libdispatch源码中,直接搜索com.apple.main-thread,也可以直接定位到它的实现代码

1.2 主队列是串行队列的依据

实现代码中,主队列的dq_atomic_flagsDQF_WIDTH(1),我们可以通过找到串行队列的dq_atomic_flags进行对比,如果一致,即可证明主队列就是一个串行队列

在源码中,搜索dispatch_queue_create

  1. dispatch_queue_t
  2. dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
  3. {
  4. return _dispatch_lane_create_with_target(label, attr,
  5. DISPATCH_TARGET_QUEUE_DEFAULT, true);
  6. }

进入_dispatch_lane_create_with_target函数,找到创建队列的核心代码
image.png

  • 创建队列,开辟空间
  • 调用构造函数,如果dqai_concurrent非真,传入1

进入_dispatch_queue_init函数,找到参数3
image.png

如果是串行队列,传入的width1dqfDQF_WIDTH(1),和主队列dq_atomic_flags属性设置的值一致,所以主队列就是一个串行队列

1.3 dq_serialnum的作用

有一些说法,如果dq_serialnum设置为1,就是串行队列,例如:主队列的设置image.png

在源码中,搜索dq_serialnum,可以找到很多赋值的场景
image.png

进入DISPATCH_QUEUE_SERIAL_NUMBER_WLF的定义
image.png

同时找到了dq_serialnum的相关注释

  • 跳过零

  • 主队列:1

  • mgr_q2

  • mgr_root_q3

  • 全局队列:4~15

  • 自定义队列:17

所以dq_serialnum并不是用来标记串行和并发队列的

1.4 自定义队列的dq_serialnum

搜索DISPATCH_QUEUE_SERIAL_NUMBER_INIT,找到被赋值的代码

  1. unsigned long volatile _dispatch_queue_serial_numbers =
  2. DISPATCH_QUEUE_SERIAL_NUMBER_INIT;

搜索_dispatch_queue_serial_numbers
image.png

找到宏定义

  1. #define os_atomic_inc_orig(p, m) \
  2. os_atomic_add_orig((p), 1, m)
  3. #define os_atomic_add_orig(p, v, m) \
  4. _os_atomic_c11_op_orig((p), (v), m, add, +)
  5. #define _os_atomic_c11_op_orig(p, v, m, o, op) \
  6. atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), v, \
  7. memory_order_##m)
  • atomic_fetch_##o##_explicit中的##o##为占位,替换的o传入的值为add

  • 替换后为atomic_fetch_add_explicit,来自C++11的原子操作函数

  • 宏的作用,传入17,之后每次+1

2. 全局队列分析

lldb中,输出全局队列的标识符

  1. dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
  2. -------------------------
  3. (lldb) po queue
  4. <OS_dispatch_queue_global: com.apple.root.default-qos[0x102b85080] = { xref = -2147483648, ref = -2147483648, sref = 1, target = [0x0], width = 0xfff, state = 0x0060000000000000, in-barrier}>

来到libdispatch源码中,搜索com.apple.root.default-qos

  1. struct dispatch_queue_global_s _dispatch_root_queues[] = {
  2. #define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
  3. ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
  4. DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
  5. DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
  6. #define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
  7. [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
  8. DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), \
  9. .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
  10. .do_ctxt = _dispatch_root_queue_ctxt(_DISPATCH_ROOT_QUEUE_IDX(n, flags)), \
  11. .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
  12. .dq_priority = flags | ((flags & DISPATCH_PRIORITY_FLAG_FALLBACK) ? \
  13. _dispatch_priority_make_fallback(DISPATCH_QOS_##n) : \
  14. _dispatch_priority_make(DISPATCH_QOS_##n, 0)), \
  15. __VA_ARGS__ \
  16. }
  17. _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
  18. .dq_label = "com.apple.root.maintenance-qos",
  19. .dq_serialnum = 4,
  20. ),
  21. _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
  22. .dq_label = "com.apple.root.maintenance-qos.overcommit",
  23. .dq_serialnum = 5,
  24. ),
  25. _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
  26. .dq_label = "com.apple.root.background-qos",
  27. .dq_serialnum = 6,
  28. ),
  29. _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
  30. .dq_label = "com.apple.root.background-qos.overcommit",
  31. .dq_serialnum = 7,
  32. ),
  33. _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
  34. .dq_label = "com.apple.root.utility-qos",
  35. .dq_serialnum = 8,
  36. ),
  37. _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
  38. .dq_label = "com.apple.root.utility-qos.overcommit",
  39. .dq_serialnum = 9,
  40. ),
  41. _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
  42. .dq_label = "com.apple.root.default-qos",
  43. .dq_serialnum = 10,
  44. ),
  45. _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
  46. DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
  47. .dq_label = "com.apple.root.default-qos.overcommit",
  48. .dq_serialnum = 11,
  49. ),
  50. _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
  51. .dq_label = "com.apple.root.user-initiated-qos",
  52. .dq_serialnum = 12,
  53. ),
  54. _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
  55. .dq_label = "com.apple.root.user-initiated-qos.overcommit",
  56. .dq_serialnum = 13,
  57. ),
  58. _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
  59. .dq_label = "com.apple.root.user-interactive-qos",
  60. .dq_serialnum = 14,
  61. ),
  62. _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
  63. .dq_label = "com.apple.root.user-interactive-qos.overcommit",
  64. .dq_serialnum = 15,
  65. ),
  66. };
  • _dispatch_root_queues为数组类型,默认将4~15的全局队列全部初始化,获取时通过不同类型,获取不同的队列

  • 和主队列的类型差异

    • 主队列:dispatch_queue_static_s

    • 全局队列:dispatch_queue_global_s

3. 创建队列

搜索dispatch_queue_create,进入_dispatch_lane_create_with_target函数

  1. DISPATCH_NOINLINE
  2. static dispatch_queue_t
  3. _dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
  4. dispatch_queue_t tq, bool legacy)
  5. {
  6. // 1、创建dqai
  7. dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
  8. // 2、规范化参数,例如:qos, overcommit, tq
  9. dispatch_qos_t qos = dqai.dqai_qos;
  10. ...
  11. // 3、根据队列类型,创建vtable
  12. if (legacy) {
  13. // if any of these attributes is specified, use non legacy classes
  14. if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
  15. legacy = false;
  16. }
  17. }
  18. const void *vtable;
  19. dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
  20. if (dqai.dqai_concurrent) {
  21. vtable = DISPATCH_VTABLE(queue_concurrent);
  22. } else {
  23. vtable = DISPATCH_VTABLE(queue_serial);
  24. }
  25. ...
  26. // 4、开辟空间,初始化队列
  27. dispatch_lane_t dq = _dispatch_object_alloc(vtable,
  28. sizeof(struct dispatch_lane_s));
  29. _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
  30. DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
  31. (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
  32. dq->dq_label = label;
  33. ...
  34. // 5、根据不同类型,通过模板创建队列
  35. dq->do_targetq = tq;
  36. _dispatch_object_debug(dq, "%s", __func__);
  37. return _dispatch_trace_queue_create(dq)._dq;
  38. }

2.1 创建dqai

进入_dispatch_queue_attr_to_info函数
image.png

  • 创建空的dqai,默认为串行队列。如果传入的参数为空,返回空dqai

  • 传入的参数有值,将队列信息保存到dqai

2.2 规范化参数

image.png

  • 设置优先级、服务质量等参数

2.3 创建vtable

通过DISPATCH_VTABLE,将传入的队列类型拼接vtable
image.png

找到DISPATCH_VTABLE宏的定义

  1. #define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
  2. #define DISPATCH_OBJC_CLASS(name) (&DISPATCH_CLASS_SYMBOL(name))
  3. #define DISPATCH_CLASS_SYMBOL(name) _dispatch_##name##_vtable

2.4 初始化队列

使用_dispatch_object_alloc函数开辟空间,进入_os_object_alloc_realized函数
image.png

  • 包含isa指向,说明队列也是一个对象

使用_dispatch_queue_init函数初始化队列
image.png

  • 队列类型为dispatch_queue_t,为队列的成员变量赋值

2.5 通过模板创建队列

使用_dispatch_introspection_queue_create函数,传入初始化后的dq

流程:_dispatch_introspection_queue_create_dispatch_introspection_queue_create_hookdispatch_introspection_queue_get_info_dispatch_introspection_lane_get_info
image.png

4. 继承链

使用_dispatch_lane_create_with_target创建队列,返回dispatch_queue_t类型

dispatch_queue_t类型来自于宏定义

  1. DISPATCH_DECL(dispatch_queue);
  2. #define DISPATCH_DECL(name) OS_OBJECT_DECL_SUBCLASS(name, dispatch_object)
  3. #define OS_OBJECT_DECL_SUBCLASS(name, super) \
  4. OS_OBJECT_DECL_IMPL(name, NSObject, <OS_OBJECT_CLASS(super)>)
  5. #define OS_OBJECT_DECL_IMPL(name, adhere, ...) \
  6. OS_OBJECT_DECL_PROTOCOL(name, __VA_ARGS__) \
  7. typedef adhere<OS_OBJECT_CLASS(name)> \
  8. * OS_OBJC_INDEPENDENT_CLASS name##_t
  9. #define OS_OBJECT_DECL_PROTOCOL(name, ...) \
  10. @protocol OS_OBJECT_CLASS(name) __VA_ARGS__ \
  11. @end
  12. #define OS_OBJECT_CLASS(name) OS_##name
  • 第一步:使用OS_OBJECT_CLASS在前面拼接OS_,例如:OS_dispatch_queue

  • 第二步:使用OS_OBJECT_DECL_PROTOCOL在结尾拼接_t,例如:OS_dispatch_queue_t

  • 第三步:使用泛型接收,typedef adhere<OS_dispatch_queue> * OS_OBJC_INDEPENDENT_CLASS OS_dispatch_queue_t

DISPATCH_DECL宏在dispatch_object_s结构体中的定义

  1. #define DISPATCH_DECL(name) \
  2. typedef struct name##_s : public dispatch_object_s {} *name##_t
  • typedef struct dispatch_queue_s : public dispatch_object_s {} *dispatch_queue_t

  • dispatch_queue_t的本质是dispatch_queue_sdispatch_queue_s继承自dispatch_object_s

dispatch_object_s来自于dispatch_object_t的联合体

  1. typedef union {
  2. struct _os_object_s *_os_obj;
  3. struct dispatch_object_s *_do;
  4. struct dispatch_queue_s *_dq;
  5. struct dispatch_queue_attr_s *_dqa;
  6. struct dispatch_group_s *_dg;
  7. struct dispatch_source_s *_ds;
  8. struct dispatch_channel_s *_dch;
  9. struct dispatch_mach_s *_dm;
  10. struct dispatch_mach_msg_s *_dmsg;
  11. struct dispatch_semaphore_s *_dsema;
  12. struct dispatch_data_s *_ddata;
  13. struct dispatch_io_s *_dchannel;
  14. } dispatch_object_t DISPATCH_TRANSPARENT_UNION;
  • 所以最终的根类是dispatch_object_t

找到dispatch_queue_s的结构

  1. struct dispatch_queue_s {
  2. DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
  3. /* 32bit hole on LP64 */
  4. } DISPATCH_ATOMIC64_ALIGN;

找到结构体内嵌套的DISPATCH_QUEUE_CLASS_HEADER定义

  1. #define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
  2. _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
  3. /* LP64 global queue cacheline boundary */ \
  4. unsigned long dq_serialnum; \
  5. const char *dq_label; \
  6. DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
  7. const uint16_t dq_width, \
  8. const uint16_t __dq_opaque2 \
  9. ); \
  10. dispatch_priority_t dq_priority; \
  11. union { \
  12. struct dispatch_queue_specific_head_s *dq_specific_head; \
  13. struct dispatch_source_refs_s *ds_refs; \
  14. struct dispatch_timer_source_refs_s *ds_timer_refs; \
  15. struct dispatch_mach_recv_refs_s *dm_recv_refs; \
  16. struct dispatch_channel_callbacks_s const *dch_callbacks; \
  17. }; \
  18. int volatile dq_sref_cnt
  19. #define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
  20. DISPATCH_OBJECT_HEADER(x); \
  21. __pointer_sized_field__; \
  22. DISPATCH_UNION_LE(uint64_t volatile dq_state, \
  23. dispatch_lock dq_state_lock, \
  24. uint32_t dq_state_bits \
  25. )
  26. #define DISPATCH_OBJECT_HEADER(x) \
  27. struct dispatch_object_s _as_do[0]; \
  28. _DISPATCH_OBJECT_HEADER(x)
  • DISPATCH_OBJECT_HEADER中包含dispatch_object_s结构体和_DISPATCH_OBJECT_HEADER

找到_DISPATCH_OBJECT_HEADER的定义

  1. #define _DISPATCH_OBJECT_HEADER(x) \
  2. struct _os_object_s _as_os_obj[0]; \
  3. OS_OBJECT_STRUCT_HEADER(dispatch_##x); \
  4. struct dispatch_##x##_s *volatile do_next; \
  5. struct dispatch_queue_s *do_targetq; \
  6. void *do_ctxt; \
  7. union { \
  8. dispatch_function_t DISPATCH_FUNCTION_POINTER do_finalizer; \
  9. void *do_introspection_ctxt; \
  10. }
  • _DISPATCH_OBJECT_HEADER中包含_os_object_s结构体和OS_OBJECT_STRUCT_HEADER

找到OS_OBJECT_STRUCT_HEADER的定义

  1. #define OS_OBJECT_STRUCT_HEADER(x) \
  2. _OS_OBJECT_HEADER(\
  3. const struct x##_vtable_s *__ptrauth_objc_isa_pointer do_vtable, \
  4. do_ref_cnt, \
  5. do_xref_cnt)
  6. #define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
  7. isa; /* must be pointer-sized and use __ptrauth_objc_isa_pointer */ \
  8. int volatile ref_cnt; \
  9. int volatile xref_cnt
  • 包含isa成员变量

所以dispatch_queue_s结构体还伪继承于_os_object_s,并包含isa成员变量,说明队列也是一个对象

5. 同步函数

5.1 全局队列

同步函数配合全局队列

  1. dispatch_sync(dispatch_get_global_queue(0, 0), ^{
  2. NSLog(@"Block");
  3. });

libdispatch源码中,找到dispatch_sync函数的实现

  1. void
  2. dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
  3. {
  4. uintptr_t dc_flags = DC_FLAG_BLOCK;
  5. if (unlikely(_dispatch_block_has_private_data(work))) {
  6. return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
  7. }
  8. _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
  9. }
  • 传入的work参数就是任务的block,只需要一直追寻work参数就能找到调用的代码
  • 传入_dispatch_sync_f函数的参数3,对work进行包装

找到_dispatch_Block_invoke的定义

  1. #define _dispatch_Block_invoke(bb) \
  2. ((dispatch_function_t)((struct Block_layout *)bb)->invoke)
  • block下的invoke强转为dispatch_function_t结构体

进入_dispatch_sync_f函数

  1. static void
  2. _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
  3. uintptr_t dc_flags)
  4. {
  5. _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
  6. }
  • ctxtblock
  • funcinvoke

进入_dispatch_sync_f_inline函数

  1. static inline void
  2. _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
  3. dispatch_function_t func, uintptr_t dc_flags)
  4. {
  5. if (likely(dq->dq_width == 1)) {
  6. return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
  7. }
  8. if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
  9. DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
  10. }
  11. dispatch_lane_t dl = upcast(dq)._dl;
  12. // Global concurrent queues and queues bound to non-dispatch threads
  13. // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
  14. if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
  15. return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
  16. }
  17. if (unlikely(dq->do_targetq->do_targetq)) {
  18. return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
  19. }
  20. _dispatch_introspection_sync_begin(dl);
  21. _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
  22. _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
  23. }
  • 函数中出现了复杂的逻辑,可以使用符号断点,找到正确的逻辑分支

在项目中,将所有函数设置符号断点
image.png

  • 进入_dispatch_sync_f_slow函数

进入_dispatch_sync_f_slow函数

  1. DISPATCH_NOINLINE
  2. static void
  3. _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
  4. dispatch_function_t func, uintptr_t top_dc_flags,
  5. dispatch_queue_class_t dqu, uintptr_t dc_flags)
  6. {
  7. dispatch_queue_t top_dq = top_dqu._dq;
  8. dispatch_queue_t dq = dqu._dq;
  9. if (unlikely(!dq->do_targetq)) {
  10. return _dispatch_sync_function_invoke(dq, ctxt, func);
  11. }
  12. pthread_priority_t pp = _dispatch_get_priority();
  13. struct dispatch_sync_context_s dsc = {
  14. .dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
  15. .dc_func = _dispatch_async_and_wait_invoke,
  16. .dc_ctxt = &dsc,
  17. .dc_other = top_dq,
  18. .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
  19. .dc_voucher = _voucher_get(),
  20. .dsc_func = func,
  21. .dsc_ctxt = ctxt,
  22. .dsc_waiter = _dispatch_tid_self(),
  23. };
  24. _dispatch_trace_item_push(top_dq, &dsc);
  25. __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
  26. if (dsc.dsc_func == NULL) {
  27. // dsc_func being cleared means that the block ran on another thread ie.
  28. // case (2) as listed in _dispatch_async_and_wait_f_slow.
  29. dispatch_queue_t stop_dq = dsc.dc_other;
  30. return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
  31. }
  32. _dispatch_introspection_sync_begin(top_dq);
  33. _dispatch_trace_item_pop(top_dq, &dsc);
  34. _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
  35. DISPATCH_TRACE_ARG(&dsc));
  36. }
  • 同样是复杂逻辑,继续使用符号断点

在项目中,将所有可能的条件分支,全部设置符号断点image.png

  • 进入_dispatch_sync_function_invoke函数

进入_dispatch_sync_function_invoke函数

  1. DISPATCH_NOINLINE
  2. static void
  3. _dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
  4. dispatch_function_t func)
  5. {
  6. _dispatch_sync_function_invoke_inline(dq, ctxt, func);
  7. }

进入_dispatch_sync_function_invoke_inline函数

  1. static inline void
  2. _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
  3. dispatch_function_t func)
  4. {
  5. dispatch_thread_frame_s dtf;
  6. _dispatch_thread_frame_push(&dtf, dq);
  7. _dispatch_client_callout(ctxt, func);
  8. _dispatch_perfmon_workitem_inc();
  9. _dispatch_thread_frame_pop(&dtf);
  10. }
  • _dispatch_thread_frame_push:任务加入队列
  • _dispatch_client_callout:执行任务
  • _dispatch_thread_frame_pop:任务移除队列

进入_dispatch_client_callout函数

  1. void
  2. _dispatch_client_callout(void *ctxt, dispatch_function_t f)
  3. {
  4. _dispatch_get_tsd_base();
  5. void *u = _dispatch_get_unwind_tsd();
  6. if (likely(!u)) return f(ctxt);
  7. _dispatch_set_unwind_tsd(NULL);
  8. f(ctxt);
  9. _dispatch_free_unwind_tsd();
  10. _dispatch_set_unwind_tsd(u);
  11. }
  • 调用f(ctxt)就是内部自动调用block任务的代码

通过lldb进行验证
image.png

  • 和源码中的逻辑完全一致

同步函数顺序被执行,因为_dispatch_sync_function_invoke_inline中,三个函数的顺序调用:

  • _dispatch_thread_frame_push:任务加入队列

  • _dispatch_client_callout:执行任务

  • _dispatch_thread_frame_pop:任务移除队列

当同步函数加入串行队列,在_dispatch_sync_f_inline函数中,使用_dispatch_barrier_sync_f同步栅栏函数

5.2 并发队列

同步函数配合并发队列

  1. dispatch_queue_t queue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
  2. dispatch_sync(queue, ^{
  3. NSLog(@"block---:%@",[NSThread currentThread]);
  4. });

进入_dispatch_sync_f_inline函数,并发队列触发的代码分支和全局队列有所不同
image.png

进入_dispatch_sync_invoke_and_complete函数

  1. static void
  2. _dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
  3. dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
  4. {
  5. _dispatch_sync_function_invoke_inline(dq, ctxt, func);
  6. _dispatch_trace_item_complete(dc);
  7. _dispatch_lane_non_barrier_complete(dq, 0);
  8. }
  • 最终调用的还是_dispatch_sync_function_invoke_inline函数

函数中的最后一个入参

  1. dispatch_function_t func DISPATCH_TRACE_ARG(void *dc)
  2. #define DISPATCH_TRACE_ARG(arg)
  3. #define DISPATCH_TRACE_ARG(arg) , arg
  • ,arg封装到一起,所以外部不用加额外的逗号

5.3 死锁

image.png

  • 当出现死锁异常时,调用_dispatch_sync_f_slow函数
  • 然后调用__DISPATCH_WAIT_FOR_QUEUE__,抛出异常

探索流程:dispatch_sync_dispatch_sync_f_dispatch_sync_f_inline_dispatch_sync_f_inline

进入_dispatch_sync_f_inline函数
image.png

  • 当前为同步函数,并且加入串行队列,调用_dispatch_barrier_sync_f函数
  • 同步函数加入串行队列的底层,使用同步栅栏函数

进入_dispatch_barrier_sync_f函数

  1. static void
  2. _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
  3. dispatch_function_t func, uintptr_t dc_flags)
  4. {
  5. _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
  6. }

进入_dispatch_barrier_sync_f_inline函数
image.png

  • 出现死锁异常时,已知的_dispatch_sync_f_slow函数

进入_dispatch_sync_f_slow函数
image.png

  • 内部调用__DISPATCH_WAIT_FOR_QUEUE__抛出异常

进入__DISPATCH_WAIT_FOR_QUEUE__函数
image.png

  • _dq_state_drain_locked_by函数返回真,抛出异常
  • 传入两个参数,dq_state为队列状态,dsc_waiter为线程的tid

进入_dq_state_drain_locked_by函数

  1. static inline bool
  2. _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
  3. {
  4. return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
  5. }

进入_dispatch_lock_is_locked_by函数

  1. static inline bool
  2. _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
  3. {
  4. // equivalent to _dispatch_lock_owner(lock_value) == tid
  5. return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
  6. }
  • 找到判断依据

找到DLOCK_OWNER_MASK定义

  1. #define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc)
  • DLOCK_OWNER_MASK定位为很大值,只要一个非零的值和它进行&运算,结果一定不为领

  • lock_valuetid进行按位异或,它们运算结果为零只有一种可能,就是二者的值相同

当同步函数加入到串行队列中,同一线程在等待时又被执行,就会形成相互等待的局面,造成死锁的异常

6. 异步函数

libdispatch源码中,找到dispatch_async函数的实现

  1. void
  2. dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
  3. {
  4. dispatch_continuation_t dc = _dispatch_continuation_alloc();
  5. uintptr_t dc_flags = DC_FLAG_CONSUME;
  6. dispatch_qos_t qos;
  7. qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
  8. _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
  9. }
  • 将任务封装成dispatch_qos_t类型的qos
  • 调用_dispatch_continuation_async函数


    6.1 任务和优先级的封装

进入_dispatch_continuation_init函数

  1. static inline dispatch_qos_t
  2. _dispatch_continuation_init(dispatch_continuation_t dc,
  3. dispatch_queue_class_t dqu, dispatch_block_t work,
  4. dispatch_block_flags_t flags, uintptr_t dc_flags)
  5. {
  6. void *ctxt = _dispatch_Block_copy(work);
  7. dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
  8. if (unlikely(_dispatch_block_has_private_data(work))) {
  9. dc->dc_flags = dc_flags;
  10. dc->dc_ctxt = ctxt;
  11. // will initialize all fields but requires dc_flags & dc_ctxt to be set
  12. return _dispatch_continuation_init_slow(dc, dqu, flags);
  13. }
  14. dispatch_function_t func = _dispatch_Block_invoke(work);
  15. if (dc_flags & DC_FLAG_CONSUME) {
  16. func = _dispatch_call_block_and_release;
  17. }
  18. return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
  19. }

进入_dispatch_continuation_init_f函数

  1. static inline dispatch_qos_t
  2. _dispatch_continuation_init_f(dispatch_continuation_t dc,
  3. dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
  4. dispatch_block_flags_t flags, uintptr_t dc_flags)
  5. {
  6. pthread_priority_t pp = 0;
  7. dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
  8. dc->dc_func = f;
  9. dc->dc_ctxt = ctxt;
  10. // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
  11. // should not be propagated, only taken from the handler if it has one
  12. if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
  13. pp = _dispatch_priority_propagate();
  14. }
  15. _dispatch_continuation_voucher_set(dc, flags);
  16. return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
  17. }
  • block任务进行封装

进入_dispatch_continuation_priority_set函数

  1. static inline dispatch_qos_t
  2. _dispatch_continuation_priority_set(dispatch_continuation_t dc,
  3. dispatch_queue_class_t dqu,
  4. pthread_priority_t pp, dispatch_block_flags_t flags)
  5. {
  6. dispatch_qos_t qos = DISPATCH_QOS_UNSPECIFIED;
  7. #if HAVE_PTHREAD_WORKQUEUE_QOS
  8. dispatch_queue_t dq = dqu._dq;
  9. if (likely(pp)) {
  10. bool enforce = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
  11. bool is_floor = (dq->dq_priority & DISPATCH_PRIORITY_FLAG_FLOOR);
  12. bool dq_has_qos = (dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK);
  13. if (enforce) {
  14. pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
  15. qos = _dispatch_qos_from_pp_unsafe(pp);
  16. } else if (!is_floor && dq_has_qos) {
  17. pp = 0;
  18. } else {
  19. qos = _dispatch_qos_from_pp_unsafe(pp);
  20. }
  21. }
  22. dc->dc_priority = pp;
  23. #else
  24. (void)dc; (void)dqu; (void)pp; (void)flags;
  25. #endif
  26. return qos;
  27. }
  • 对任务优先级进行封装

异步函数的执行,优先级是衡量标准的其中一项。而且任务根据CPU的调度情况异步执行,所以一定是无序的,需要对其进行封装

6.2 并发队列

异步函数配合并发队列

  1. dispatch_queue_t queue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
  2. dispatch_async(queue, ^{
  3. NSLog(@"block---:%@",[NSThread currentThread]);
  4. });

进入_dispatch_continuation_async函数

  1. static inline void
  2. _dispatch_continuation_async(dispatch_queue_class_t dqu,
  3. dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
  4. {
  5. #if DISPATCH_INTROSPECTION
  6. if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
  7. _dispatch_trace_item_push(dqu, dc);
  8. }
  9. #else
  10. (void)dc_flags;
  11. #endif
  12. return dx_push(dqu._dq, dc, qos);
  13. }

找到dx_push的宏定义

  1. #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
  • 此时我们关注的是参数3的调用,可以暂时无视dx_vtable,继续跟踪dq_push
  • dq_push属于函数的赋值点,根据队列类型的不同赋值也不同

查看并发队列的赋值

  1. DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
  2. .do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
  3. .do_dispose = _dispatch_lane_dispose,
  4. .do_debug = _dispatch_queue_debug,
  5. .do_invoke = _dispatch_lane_invoke,
  6. .dq_activate = _dispatch_lane_activate,
  7. .dq_wakeup = _dispatch_lane_wakeup,
  8. .dq_push = _dispatch_lane_concurrent_push,
  9. );

进入_dispatch_lane_concurrent_push函数

  1. void
  2. _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
  3. dispatch_qos_t qos)
  4. {
  5. // <rdar://problem/24738102&24743140> reserving non barrier width
  6. // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
  7. // width equivalent), so we have to check that this thread hasn't
  8. // enqueued anything ahead of this call or we can break ordering
  9. if (dq->dq_items_tail == NULL &&
  10. !_dispatch_object_is_waiter(dou) &&
  11. !_dispatch_object_is_barrier(dou) &&
  12. _dispatch_queue_try_acquire_async(dq)) {
  13. return _dispatch_continuation_redirect_push(dq, dou, qos);
  14. }
  15. _dispatch_lane_push(dq, dou, qos);
  16. }

使用符号断点,先进入_dispatch_continuation_redirect_push函数

  1. static void
  2. _dispatch_continuation_redirect_push(dispatch_lane_t dl,
  3. dispatch_object_t dou, dispatch_qos_t qos)
  4. {
  5. if (likely(!_dispatch_object_is_redirection(dou))) {
  6. dou._dc = _dispatch_async_redirect_wrap(dl, dou);
  7. } else if (!dou._dc->dc_ctxt) {
  8. // find first queue in descending target queue order that has
  9. // an autorelease frequency set, and use that as the frequency for
  10. // this continuation.
  11. dou._dc->dc_ctxt = (void *)
  12. (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
  13. }
  14. dispatch_queue_t dq = dl->do_targetq;
  15. if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
  16. dx_push(dq, dou, qos);
  17. }

再次调用dx_push,此时队列类型为queue_pthread_root,所以dx_push不是_dispatch_lane_concurrent_push函数,而是对应了_dispatch_root_queue_push函数,类似于调用了父类的方法
image.png

通过符号断点配合汇编代码查看流程:

_dispatch_root_queue_push_dispatch_root_queue_push_override_dispatch_root_queue_poke_dispatch_root_queue_poke_slow

进入_dispatch_root_queue_poke_slow函数

  1. static void
  2. _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
  3. {
  4. ...
  5. _dispatch_root_queues_init();
  6. ...
  7. do {
  8. _dispatch_retain(dq); // released in _dispatch_worker_thread
  9. while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
  10. if (r != EAGAIN) {
  11. (void)dispatch_assume_zero(r);
  12. }
  13. _dispatch_temporary_resource_shortage();
  14. }
  15. } while (--remaining);
  16. ...
  17. }
  • 通过_dispatch_root_queues_init注册异步任务执行的回调
  • 通过do...while循环创建线程,使用pthread_create函数

使用lldb进行反推
image.png

  • 由系统的_pthread_wqthread调用libdispatch_dispatch_worker_thread2函数

源码中正向推导流程:

_dispatch_root_queues_init_dispatch_root_queues_init_once_dispatch_worker_thread2

_dispatch_worker_thread2进行赋值,封装给pthreadAPI调用
image.png

  • 异步线程被CPU调度,系统在合适的时机,通过libsystem_pthread.dylib_pthread_wqthread函数,调用libdispatch.dylib_dispatch_worker_thread2函数,最终执行异步函数的任务

异步任务回调流程:

_dispatch_worker_thread2_dispatch_root_queue_drain_dispatch_async_redirect_invoke_dispatch_continuation_pop_dispatch_client_callout_dispatch_call_block_and_release

异步函数配合并发队列的逻辑:

  • 首先对任务和优先级进行封装

  • 多次调用dx_push,最终找到_dispatch_root_queue_push

  • 通过_dispatch_root_queues_init注册异步任务执行的回调

  • 通过do...while循环创建线程,使用pthread_create函数

  • 异步线程被CPU调度,系统在合适的时机,通过libsystem_pthread.dylib调用_dispatch_worker_thread2函数,执行回调

6.3 全局队列

异步函数配全局队列

  1. dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
  2. dispatch_async(queue, ^{
  3. NSLog(@"block---:%@",[NSThread currentThread]);
  4. });

代码逻辑:dispatch_async_dispatch_continuation_async_dispatch_continuation_asyncdx_push

此时调用的dx_push,和并发队列的赋值不同

  1. DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
  2. .do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
  3. .do_dispose = _dispatch_object_no_dispose,
  4. .do_debug = _dispatch_queue_debug,
  5. .do_invoke = _dispatch_object_no_invoke,
  6. .dq_activate = _dispatch_queue_no_activate,
  7. .dq_wakeup = _dispatch_root_queue_wakeup,
  8. .dq_push = _dispatch_root_queue_push,
  9. );

进入_dispatch_root_queue_push函数流程:

_dispatch_root_queue_push_dispatch_root_queue_push_override_dispatch_root_queue_push_inline_dispatch_root_queue_poke_dispatch_root_queue_poke_slow

进入_dispatch_root_queue_poke_slow函数:

  • 通过_dispatch_root_queues_init注册异步任务执行的回调
  • 通过do...while循环创建线程,使用pthread_create函数

使用lldb进行反推
image.png

异步任务回调流程:

_dispatch_worker_thread2_dispatch_root_queue_drain_dispatch_queue_override_invoke_dispatch_client_callout_dispatch_call_block_and_release

7. 单例模式

  1. static dispatch_once_t onceToken;
  2. dispatch_once(&onceToken, ^{
  3. NSLog(@"block---:%@",[NSThread currentThread]);
  4. });

来到源码,找到dispatch_once函数的实现

  1. void
  2. dispatch_once(dispatch_once_t *val, dispatch_block_t block)
  3. {
  4. dispatch_once_f(val, block, _dispatch_Block_invoke(block));
  5. }

进入dispatch_once_f函数

  1. void
  2. dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
  3. {
  4. dispatch_once_gate_t l = (dispatch_once_gate_t)val;
  5. #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  6. uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
  7. if (likely(v == DLOCK_ONCE_DONE)) {
  8. return;
  9. }
  10. #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  11. if (likely(DISPATCH_ONCE_IS_GEN(v))) {
  12. return _dispatch_once_mark_done_if_quiesced(l, v);
  13. }
  14. #endif
  15. #endif
  16. if (_dispatch_once_gate_tryenter(l)) {
  17. return _dispatch_once_callout(l, ctxt, func);
  18. }
  19. return _dispatch_once_wait(l);
  20. }
  • val强转为dispatch_once_gate_t类型,类似于栅栏的使用

三个条件分支:

  • 如果执行完成,直接返回

  • 如果第一次,执行_dispatch_once_callout函数

  • 如果正在执行,进入_dispatch_once_wait等待

7.1 锁的处理

进入_dispatch_once_gate_tryenter函数

  1. static inline bool
  2. _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
  3. {
  4. return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
  5. (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
  6. }
  • 进行原子锁的处理,防止多线程

7.2 执行任务

进入_dispatch_once_callout函数

  1. static void
  2. _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
  3. dispatch_function_t func)
  4. {
  5. _dispatch_client_callout(ctxt, func);
  6. _dispatch_once_gate_broadcast(l);
  7. }
  8. void
  9. _dispatch_client_callout(void *ctxt, dispatch_function_t f)
  10. {
  11. _dispatch_get_tsd_base();
  12. void *u = _dispatch_get_unwind_tsd();
  13. if (likely(!u)) return f(ctxt);
  14. _dispatch_set_unwind_tsd(NULL);
  15. f(ctxt);
  16. _dispatch_free_unwind_tsd();
  17. _dispatch_set_unwind_tsd(u);
  18. }
  • 通过f(ctxt)执行任务的回调

进入_dispatch_once_gate_broadcast函数

  1. static inline void
  2. _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
  3. {
  4. dispatch_lock value_self = _dispatch_lock_value_for_self();
  5. uintptr_t v;
  6. #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  7. v = _dispatch_once_mark_quiescing(l);
  8. #else
  9. v = _dispatch_once_mark_done(l);
  10. #endif
  11. if (likely((dispatch_lock)v == value_self)) return;
  12. _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
  13. }
  • 锁的处理,并标记为完成

单例模式的原理:

  • 调用dispatch_once函数,传入onceTokenblock。其中onceToken为静态变量,具有唯一性,在底层被强转为dispatch_once_gate_t类型的变量ll通过os_atomic_load函数获取底层原子封装性的关联,得到变量v,通过v来查询任务的状态,如果此时v等于DLOCK_ONCE_DONE,说明任务已经处理过一次了,直接return

  • 如果任务首次执行,将任务进行加锁,任务状态置为DLOCK_ONCE_UNLOCK,目的保证线程安全。加锁后进行block回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为DLOCK_ONCE_DONE,在下次进来时,就不会在执行,会直接返回

  • 如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的

8. 线程池

8.1 创建线程

异步函数在_dispatch_root_queue_poke_slow中,如果是全局队列,使用_pthread_workqueue_addthreads函数创建并执行

  1. #if !DISPATCH_USE_INTERNAL_WORKQUEUE
  2. #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
  3. if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
  4. #endif
  5. {
  6. _dispatch_root_queue_debug("requesting new worker thread for global "
  7. "queue: %p", dq);
  8. r = _pthread_workqueue_addthreads(remaining,
  9. _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
  10. (void)dispatch_assume_zero(r);
  11. return;
  12. }
  13. #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE

如果是普通队列,使用do...while进行线程池的创建,在创建之前,还要对线程池的状态进行判断

  1. int can_request, t_count;
  2. // seq_cst with atomic store to tail <rdar://problem/16932833>
  3. t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
  4. do {
  5. can_request = t_count < floor ? 0 : t_count - floor;
  6. if (remaining > can_request) {
  7. _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
  8. remaining, can_request);
  9. os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
  10. remaining = can_request;
  11. }
  12. if (remaining == 0) {
  13. _dispatch_root_queue_debug("pthread pool is full for root queue: "
  14. "%p", dq);
  15. return;
  16. }
  17. } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
  18. t_count - remaining, &t_count, acquire));
  • 判断dgq_thread_pool_size,源码中标记为1
  • dgq_thread_pool_size会根据逻辑自增,加到最大值为止
  • remainingfloor为入参,传入10
  • 计算can_request线程数,如果t_count小于floor返回0,否则返回t_count减去floor的差值
  • 如果remaining线程数大于can_requestpthread线程池减少请求,以can_request线程数为准
  • 如果remaining0,表示根队列的pthread线程池已满

使用pthread_create函数,创建线程

  1. do {
  2. _dispatch_retain(dq); // released in _dispatch_worker_thread
  3. while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
  4. if (r != EAGAIN) {
  5. (void)dispatch_assume_zero(r);
  6. }
  7. _dispatch_temporary_resource_shortage();
  8. }
  9. } while (--remaining);

8.2 最大线程数

线程池最大线程数的设定

  1. int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT;
  2. #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
  • 最大线程数设置255,但实际程序中开辟的线程数,不一定能达到这个最大值

官方文档中,辅助线程为512KB,辅助线程允许的最小堆栈大小为16KB,并且堆栈大小必须是4KB 的倍数

程序启动,系统给出的虚拟内存4GB,用户态占3GB,内核态占1GB。但内核态的1GB并不能全部用来开辟线程,所以最大线程数是未知的

  1. do {
  2. _dispatch_retain(dq); // released in _dispatch_worker_thread
  3. #if DISPATCH_DEBUG
  4. unsigned dwStackSize = 0;
  5. #else
  6. unsigned dwStackSize = 64 * 1024;
  7. #endif
  8. uintptr_t hThread = 0;
  9. while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
  10. if (errno != EAGAIN) {
  11. (void)dispatch_assume(hThread);
  12. }
  13. _dispatch_temporary_resource_shortage();
  14. }
  15. #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
  16. if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
  17. (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
  18. }
  19. #endif
  20. CloseHandle((HANDLE)hThread);
  21. } while (--remaining);
  • 按照内核态1GB满载,最小堆栈大小为16KB计算,最大线程数可开辟64 * 1024。按照辅助线程512KB计算,最大线程数可开辟2048

9. 栅栏函数

iOS中有两种栅栏函数,都是用于控制任务的执⾏顺序

  • dispatch_barrier_async:异步栅栏函数,前面的任务执行完毕才会来到这里

  • dispatch_barrier_sync:同步栅栏函数,和异步栅栏函数的作用相同,但是同步栅栏函数会堵塞线程,影响后面的任务执行

使用栅栏函数的注意事项:

  • 栅栏函数只能控制同一并发队列

  • 同步栅栏函数添加队列,当前线程会被锁死,直到栅栏之前的任务和栅栏本身的任务执行完毕,当前线程才会继续执行

  • 全局并发队列不支持栅栏函数,因为可能会干扰系统级的任务执行

  • 如果是串行队列,使用栅栏函数的作用等同于一个同步函数,没有任何意义

栅栏函数还可用于线程安全,类似于锁的作用

  1. dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
  2. for (int i = 0; i<10000; i++) {
  3. dispatch_async(concurrentQueue, ^{
  4. dispatch_barrier_async(concurrentQueue, ^{
  5. [self.mArray addObject:[NSString stringWithFormat:@"%d",i]];
  6. });
  7. // @synchronized (self) {
  8. // [self.mArray addObject:[NSString stringWithFormat:@"%d",i]];
  9. // }
  10. });
  11. }
  • 此案例,如果不加栅栏函数,也不加互斥锁,使用并发队列多线程对同一数组进行addObject,很有可能会发生崩溃
  • 因为数据的写入,本质是对旧值的release,对新值的retain
  • 当数据不断releaseretain时,多线程会造成数据还没有retain完毕,就开始进行release,相当于加入空数据,进行release

9.1 同步栅栏函数分析

源码中,找到dispatch_barrier_sync函数的实现

  1. void
  2. dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
  3. {
  4. uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
  5. if (unlikely(_dispatch_block_has_private_data(work))) {
  6. return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
  7. }
  8. _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
  9. }}

_dispatch_barrier_sync_f_dispatch_barrier_sync_f_inline

  1. static inline void
  2. _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
  3. dispatch_function_t func, uintptr_t dc_flags)
  4. {
  5. dispatch_tid tid = _dispatch_tid_self();
  6. if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
  7. DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
  8. }
  9. dispatch_lane_t dl = upcast(dq)._dl;
  10. // The more correct thing to do would be to merge the qos of the thread
  11. // that just acquired the barrier lock into the queue state.
  12. //
  13. // However this is too expensive for the fast path, so skip doing it.
  14. // The chosen tradeoff is that if an enqueue on a lower priority thread
  15. // contends with this fast path, this thread may receive a useless override.
  16. //
  17. // Global concurrent queues and queues bound to non-dispatch threads
  18. // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
  19. if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
  20. return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
  21. DC_FLAG_BARRIER | dc_flags);
  22. }
  23. if (unlikely(dl->do_targetq->do_targetq)) {
  24. return _dispatch_sync_recurse(dl, ctxt, func,
  25. DC_FLAG_BARRIER | dc_flags);
  26. }
  27. _dispatch_introspection_sync_begin(dl);
  28. _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
  29. DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
  30. dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
  31. }
  • 逻辑中存在进入_dispatch_sync_f_slow函数的代码,证明同步栅栏函数也可能出现死锁的情况

_dispatch_sync_recurse_dispatch_sync_invoke_and_complete_recurse_dispatch_sync_complete_recurse

  1. static void
  2. _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
  3. uintptr_t dc_flags)
  4. {
  5. bool barrier = (dc_flags & DC_FLAG_BARRIER);
  6. do {
  7. if (dq == stop_dq) return;
  8. if (barrier) {
  9. dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
  10. } else {
  11. _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
  12. }
  13. dq = dq->do_targetq;
  14. barrier = (dq->dq_width == 1);
  15. } while (unlikely(dq->do_targetq));
  16. }
  • 判断targetq,存在栅栏调用dx_wakeup等待
  • 否则,调用_dispatch_lane_non_barrier_complete函数

并发队列的dx_wakeup实现

  1. DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
  2. .do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
  3. .do_dispose = _dispatch_lane_dispose,
  4. .do_debug = _dispatch_queue_debug,
  5. .do_invoke = _dispatch_lane_invoke,
  6. .dq_activate = _dispatch_lane_activate,
  7. .dq_wakeup = _dispatch_lane_wakeup,
  8. .dq_push = _dispatch_lane_concurrent_push,
  9. );

进入_dispatch_lane_wakeup函数

  1. void
  2. _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
  3. dispatch_wakeup_flags_t flags)
  4. {
  5. dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
  6. if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
  7. return _dispatch_lane_barrier_complete(dqu, qos, flags);
  8. }
  9. if (_dispatch_queue_class_probe(dqu)) {
  10. target = DISPATCH_QUEUE_WAKEUP_TARGET;
  11. }
  12. return _dispatch_queue_wakeup(dqu, qos, flags, target);
  13. }
  • 针对栅栏函数进行判断,进入_dispatch_lane_barrier_complete函数

进入_dispatch_lane_barrier_complete函数

  1. static void
  2. _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
  3. dispatch_wakeup_flags_t flags)
  4. {
  5. dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
  6. dispatch_lane_t dq = dqu._dl;
  7. if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
  8. struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
  9. if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
  10. if (_dispatch_object_is_waiter(dc)) {
  11. return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
  12. }
  13. } else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
  14. return _dispatch_lane_drain_non_barriers(dq, dc, flags);
  15. }
  16. if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
  17. _dispatch_retain_2(dq);
  18. flags |= DISPATCH_WAKEUP_CONSUME_2;
  19. }
  20. target = DISPATCH_QUEUE_WAKEUP_TARGET;
  21. }
  22. uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
  23. dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
  24. return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
  25. }
  • 如果是串行队列,栅栏相当于同步函数,调用_dispatch_lane_drain_barrier_waiter函数
  • 如果是并发队列,调用_dispatch_lane_drain_non_barriers函数,进行栅栏相关处理
  • 栅栏之前的任务全部完成,调用_dispatch_lane_class_barrier_complete函数

9.2 全局队列中的栅栏函数

全局队列的dx_wakeup实现

  1. DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
  2. .do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
  3. .do_dispose = _dispatch_object_no_dispose,
  4. .do_debug = _dispatch_queue_debug,
  5. .do_invoke = _dispatch_object_no_invoke,
  6. .dq_activate = _dispatch_queue_no_activate,
  7. .dq_wakeup = _dispatch_root_queue_wakeup,
  8. .dq_push = _dispatch_root_queue_push,
  9. );

进入_dispatch_root_queue_wakeup函数

  1. void
  2. _dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
  3. DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
  4. {
  5. if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
  6. DISPATCH_INTERNAL_CRASH(dq->dq_priority,
  7. "Don't try to wake up or override a root queue");
  8. }
  9. if (flags & DISPATCH_WAKEUP_CONSUME_2) {
  10. return _dispatch_release_2_tailcall(dq);
  11. }
  12. }
  • 全局队列中,没有对栅栏函数的任何判断和处理。所以,栅栏函数在全局队列中,和普通的同步或异步函数别无二致

10. 信号量

信号量可以让异步任务同步执行,可以当锁使用,并且能够控制GCD最大并发数

  1. dispatch_semaphore_t sem = dispatch_semaphore_create(0);
  2. for (int i = 0; i < 10; i++) {
  3. dispatch_async(queue, ^{
  4. sleep(1);
  5. NSLog(@"当前 - %d, 线程 - %@", i, [NSThread currentThread]);
  6. dispatch_semaphore_signal(sem);
  7. });
  8. dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
  9. }
  • dispatch_semaphore_create:初始化信号量,设置GCD最大并发数,必须>= 0

  • dispatch_semaphore_wait:等待,对信号量减1,相当于加锁

  • dispatch_semaphore_signal:释放,对信号量加1,相当于解锁

10.1 创建

进入dispatch_semaphore_create函数

  1. dispatch_semaphore_t
  2. dispatch_semaphore_create(intptr_t value)
  3. {
  4. dispatch_semaphore_t dsema;
  5. // If the internal value is negative, then the absolute of the value is
  6. // equal to the number of waiting threads. Therefore it is bogus to
  7. // initialize the semaphore with a negative value.
  8. if (value < 0) {
  9. return DISPATCH_BAD_INPUT;
  10. }
  11. dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
  12. sizeof(struct dispatch_semaphore_s));
  13. dsema->do_next = DISPATCH_OBJECT_LISTLESS;
  14. dsema->do_targetq = _dispatch_get_default_queue(false);
  15. dsema->dsema_value = value;
  16. _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
  17. dsema->dsema_orig = value;
  18. return dsema;
  19. }
  • 初始化信号量,设置GCD最大并发数
  • 最大并发数必须>= 0

10.2 等待

进入dispatch_semaphore_wait函数

  1. intptr_t
  2. dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
  3. {
  4. long value = os_atomic_dec2o(dsema, dsema_value, acquire);
  5. if (likely(value >= 0)) {
  6. return 0;
  7. }
  8. return _dispatch_semaphore_wait_slow(dsema, timeout);
  9. }
  • os_atomic_dec2o宏,进行减1操作
  • 若信号量>= 0,直接返回0,执行wait之后的代码
  • 若信号量< 0,将阻塞当前线程,进入_dispatch_semaphore_wait_slow函数

进入_dispatch_semaphore_wait_slow函数

  1. static intptr_t
  2. _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
  3. dispatch_time_t timeout)
  4. {
  5. long orig;
  6. _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
  7. switch (timeout) {
  8. default:
  9. if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
  10. break;
  11. }
  12. // Fall through and try to undo what the fast path did to
  13. // dsema->dsema_value
  14. case DISPATCH_TIME_NOW:
  15. orig = dsema->dsema_value;
  16. while (orig < 0) {
  17. if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
  18. &orig, relaxed)) {
  19. return _DSEMA4_TIMEOUT();
  20. }
  21. }
  22. // Another thread called semaphore_signal().
  23. // Fall through and drain the wakeup.
  24. case DISPATCH_TIME_FOREVER:
  25. _dispatch_sema4_wait(&dsema->dsema_sema);
  26. break;
  27. }
  28. return 0;
  29. }
  • 根据timeout的值进行不同的逻辑处理

如果为DISPATCH_TIME_FOREVER类型,进入_dispatch_sema4_wait函数

  1. void
  2. _dispatch_sema4_wait(_dispatch_sema4_t *sema)
  3. {
  4. int ret = 0;
  5. do {
  6. ret = sem_wait(sema);
  7. } while (ret == -1 && errno == EINTR);
  8. DISPATCH_SEMAPHORE_VERIFY_RET(ret);
  9. }
  • 核心代码为do...while,通过循环使得下面的代码无法执行

10.3 释放

进入dispatch_semaphore_signal函数

  1. intptr_t
  2. dispatch_semaphore_signal(dispatch_semaphore_t dsema)
  3. {
  4. long value = os_atomic_inc2o(dsema, dsema_value, release);
  5. if (likely(value > 0)) {
  6. return 0;
  7. }
  8. if (unlikely(value == LONG_MIN)) {
  9. DISPATCH_CLIENT_CRASH(value,
  10. "Unbalanced call to dispatch_semaphore_signal()");
  11. }
  12. return _dispatch_semaphore_signal_slow(dsema);
  13. }
  • os_atomic_inc2o宏,进行加1操作
  • 若信号量> 0,直接返回0,继续执行后续代码
  • 若信号量等于LONG_MIN,抛出异常。这种情况表示wait操作过多,二者之间无法匹配。之后会调用_dispatch_semaphore_signal_slow函数,进入延迟等待

11. 调度组

调度组最直接的作⽤:控制任务执⾏顺序

  • dispatch_group_create:创建组
  • dispatch_group_async:进组任务
  • dispatch_group_notify:进组任务执行完毕通知
  • dispatch_group_wait:进组任务执行等待时间
  • dispatch_group_enter:进组
  • dispatch_group_leave:出组

使用进组任务

  1. - (void)testGCD{
  2. dispatch_group_t group = dispatch_group_create();
  3. dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
  4. dispatch_group_async(group, queue, ^{
  5. NSLog(@"接口1");
  6. });
  7. dispatch_group_async(group, queue, ^{
  8. NSLog(@"接口2");
  9. });
  10. dispatch_group_notify(group, dispatch_get_main_queue(), ^{
  11. NSLog(@"刷新");
  12. });
  13. }

使用进组、出组

  1. - (void)testGCD{
  2. dispatch_group_t group = dispatch_group_create();
  3. dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
  4. dispatch_group_enter(group);
  5. dispatch_async(queue, ^{
  6. NSLog(@"接口1");
  7. dispatch_group_leave(group);
  8. });
  9. dispatch_group_enter(group);
  10. dispatch_async(queue, ^{
  11. NSLog(@"接口2");
  12. dispatch_group_leave(group);
  13. });
  14. dispatch_group_notify(group, dispatch_get_main_queue(), ^{
  15. NSLog(@"刷新");
  16. });
  17. }
  • dispatch_group_enterdispatch_group_leave必须成对使用,否则会一直等待或出现异常

11.1 创建组

进入dispatch_group_create函数

  1. dispatch_group_t
  2. dispatch_group_create(void)
  3. {
  4. return _dispatch_group_create_with_count(0);
  5. }

进入_dispatch_group_create_with_count函数

  1. static inline dispatch_group_t
  2. _dispatch_group_create_with_count(uint32_t n)
  3. {
  4. dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
  5. sizeof(struct dispatch_group_s));
  6. dg->do_next = DISPATCH_OBJECT_LISTLESS;
  7. dg->do_targetq = _dispatch_get_default_queue(false);
  8. if (n) {
  9. os_atomic_store2o(dg, dg_bits,
  10. (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
  11. os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
  12. }
  13. return dg;
  14. }
  • 创建dispatch_group_t结构体,参数n默认传入0

11.2 进组

进入dispatch_group_enter函数

  1. void
  2. dispatch_group_enter(dispatch_group_t dg)
  3. {
  4. // The value is decremented on a 32bits wide atomic so that the carry
  5. // for the 0 -> -1 transition is not propagated to the upper 32bits.
  6. uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
  7. DISPATCH_GROUP_VALUE_INTERVAL, acquire);
  8. uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
  9. if (unlikely(old_value == 0)) {
  10. _dispatch_retain(dg); // <rdar://problem/22318411>
  11. }
  12. if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
  13. DISPATCH_CLIENT_CRASH(old_bits,
  14. "Too many nested calls to dispatch_group_enter()");
  15. }
  16. }
  • 使用os_atomic_sub_orig2o宏,对dg_bits进行减1操作
  • old_bits只可能是-10两种可能
  • old_bitsDISPATCH_GROUP_VALUE_MASK进行&运算,将结果赋值给old_value
    • 如果old_bits0old_value0,调用_dispatch_retain函数
    • 如果old_bits-1old_valueDISPATCH_GROUP_VALUE_MASK,表示进组和出组函数使用不平衡,报出异常

11.3 出组

进入dispatch_group_leave函数

  1. void
  2. dispatch_group_leave(dispatch_group_t dg)
  3. {
  4. // The value is incremented on a 64bits wide atomic so that the carry for
  5. // the -1 -> 0 transition increments the generation atomically.
  6. uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
  7. DISPATCH_GROUP_VALUE_INTERVAL, release);
  8. uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
  9. if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
  10. old_state += DISPATCH_GROUP_VALUE_INTERVAL;
  11. do {
  12. new_state = old_state;
  13. if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
  14. new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
  15. new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
  16. } else {
  17. // If the group was entered again since the atomic_add above,
  18. // we can't clear the waiters bit anymore as we don't know for
  19. // which generation the waiters are for
  20. new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
  21. }
  22. if (old_state == new_state) break;
  23. } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
  24. old_state, new_state, &old_state, relaxed)));
  25. return _dispatch_group_wake(dg, old_state, true);
  26. }
  27. if (unlikely(old_value == 0)) {
  28. DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
  29. "Unbalanced call to dispatch_group_leave()");
  30. }
  31. }
  • 使用os_atomic_add_orig2o宏,进行加1操作
  • &运算后的旧值等于DISPATCH_GROUP_VALUE_1,等待do...while停止循环,调用_dispatch_group_wake函数
  • DISPATCH_GROUP_VALUE_1等同于DISPATCH_GROUP_VALUE_MASK
  • 如果旧值为0,表示进组和出组函数使用不平衡,报出异常

进入_dispatch_group_wake函数

  1. static void
  2. _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
  3. {
  4. uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
  5. if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
  6. dispatch_continuation_t dc, next_dc, tail;
  7. // Snapshot before anything is notified/woken <rdar://problem/8554546>
  8. dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
  9. do {
  10. dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
  11. next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
  12. _dispatch_continuation_async(dsn_queue, dc,
  13. _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
  14. _dispatch_release(dsn_queue);
  15. } while ((dc = next_dc));
  16. refs++;
  17. }
  18. if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
  19. _dispatch_wake_by_address(&dg->dg_gen);
  20. }
  21. if (refs) _dispatch_release_n(dg, refs);
  22. }
  • 函数的作用,唤醒dispatch_group_notify函数
  • 核心代码在do...while循环中,调用_dispatch_continuation_async函数

进入_dispatch_continuation_async函数

  1. static inline void
  2. _dispatch_continuation_async(dispatch_queue_class_t dqu,
  3. dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
  4. {
  5. #if DISPATCH_INTROSPECTION
  6. if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
  7. _dispatch_trace_item_push(dqu, dc);
  8. }
  9. #else
  10. (void)dc_flags;
  11. #endif
  12. return dx_push(dqu._dq, dc, qos);
  13. }
  • 最终调用dx_push

11.4 通知

进入dispatch_group_notify函数

  1. static inline void
  2. _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
  3. dispatch_continuation_t dsn)
  4. {
  5. uint64_t old_state, new_state;
  6. dispatch_continuation_t prev;
  7. dsn->dc_data = dq;
  8. _dispatch_retain(dq);
  9. prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
  10. if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
  11. os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
  12. if (os_mpsc_push_was_empty(prev)) {
  13. os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
  14. new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
  15. if ((uint32_t)old_state == 0) {
  16. os_atomic_rmw_loop_give_up({
  17. return _dispatch_group_wake(dg, new_state, false);
  18. });
  19. }
  20. });
  21. }
  22. }
  • 判断状态为0,调用_dispatch_group_wake函数
  • 所以通知并不需要一直等待,因为dispatch_group_notifydispatch_group_leave中,都有_dispatch_group_wake函数的调用

11.5 任务

进入dispatch_group_async函数

  1. void
  2. dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
  3. dispatch_block_t db)
  4. {
  5. dispatch_continuation_t dc = _dispatch_continuation_alloc();
  6. uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
  7. dispatch_qos_t qos;
  8. qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
  9. _dispatch_continuation_group_async(dg, dq, dc, qos);
  10. }

进入_dispatch_continuation_group_async函数

  1. static inline void
  2. _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
  3. dispatch_continuation_t dc, dispatch_qos_t qos)
  4. {
  5. dispatch_group_enter(dg);
  6. dc->dc_data = dg;
  7. _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
  8. }
  • 调用dispatch_group_enter函数,进行进组操作

源码中搜索_dispatch_client_callout函数,在_dispatch_continuation_with_group_invoke函数中,同样调用dispatch_group_leave函数,进行出组操作

  1. static inline void
  2. _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
  3. {
  4. struct dispatch_object_s *dou = dc->dc_data;
  5. unsigned long type = dx_type(dou);
  6. if (type == DISPATCH_GROUP_TYPE) {
  7. _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
  8. _dispatch_trace_item_complete(dc);
  9. dispatch_group_leave((dispatch_group_t)dou);
  10. } else {
  11. DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
  12. }
  13. }

12. dispatch_source

dispatch_source是基础数据类型,用于协调特定底层系统事件的处理

12.1 基本介绍

dispatch_source替代了异步回调函数,来处理系统相关的事件。当配置一个dispatch时,你需要指定监测的事件、队列、以及任务回调。当事件发生时,dispatch source会提交block或函数到指定的queue去执行

使用dispatch_source代替dispatch_async的原因在于联结的优势

联结:在任一线程上调用它的一个函数dispatch_source_merge_data后,会执行Dispatch Source事先定义好的句柄(可以把句柄简单理解为一个block),这个过程叫Custom event,用户事件。是dispatch source支持处理的一种事件

句柄:是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有密切的关系,包含:

  • 实例句柄HINSTANCE
  • 位图句柄HBITMAP
  • 设备表句柄HDC
  • 图标句柄HICON
  • 通用句柄HANDLE

简单来说:这种事件是由你调用dispatch_source_merge_data函数来向自己发出的信号

使用dispatch_source的优点:

  • CPU负荷非常小,尽量不占用资源
  • 联结的优势

12.2 创建dispatch source

使用dispatch_source_create函数

  1. dispatch_source_t source = dispatch_source_create(dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue)
  • typedispatch源可处理的事件
  • handle:可以理解为句柄、索引或ID,假如要监听进程,需要传入进程ID
  • mask:可以理解为描述,提供更详细的描述,让它知道具体要监听什么
  • queue:自定义源需要的一个队列,用来处理所有的响应句柄

dispatch sourcetype的类型:

  • DISPATCH_SOURCE_TYPE_DATA_ADD:自定义的事件,变量增加
  • DISPATCH_SOURCE_TYPE_DATA_OR:自定义的事件,变量OR
  • DISPATCH_SOURCE_TYPE_MACH_SENDMACH:端口发送
  • DISPATCH_SOURCE_TYPE_MACH_RECVMACH:端口接收
  • DISPATCH_SOURCE_TYPE_MEMORYPRESSURE:内存压力 (注:iOS8后可用)
  • DISPATCH_SOURCE_TYPE_PROC:进程监听,如进程的退出、创建一个或更多的子线程、进程收到UNIX信号
  • DISPATCH_SOURCE_TYPE_READIO操作,如对文件的操作、socket操作的读响应
  • DISPATCH_SOURCE_TYPE_SIGNAL:接收到UNIX信号时响应
  • DISPATCH_SOURCE_TYPE_TIMER:定时器
  • DISPATCH_SOURCE_TYPE_VNODE:文件状态监听,文件被删除、移动、重命名
  • DISPATCH_SOURCE_TYPE_WRITEIO操作,如对文件的操作、socket操作的写响应

12.3 常用API

  1. //挂起队列
  2. dispatch_suspend(queue)
  3. //分派源创建时默认处于暂停状态,在分派源分派处理程序之前必须先恢复
  4. dispatch_resume(source)
  5. //向分派源发送事件,需要注意的是,不可以传递0值(事件不会被触发),同样也不可以传递负数。
  6. dispatch_source_merge_data
  7. //设置响应分派源事件的block,在分派源指定的队列上运行
  8. dispatch_source_set_event_handler
  9. //得到分派源的数据
  10. dispatch_source_get_data
  11. //得到dispatch源创建,即调用dispatch_source_create的第二个参数
  12. uintptr_t dispatch_source_get_handle(dispatch_source_t source);
  13. //得到dispatch源创建,即调用dispatch_source_create的第三个参数
  14. unsigned long dispatch_source_get_mask(dispatch_source_t source);
  15. ////取消dispatch源的事件处理--即不再调用block。如果调用dispatch_suspend只是暂停dispatch源。
  16. void dispatch_source_cancel(dispatch_source_t source);
  17. //检测是否dispatch源被取消,如果返回非0值则表明dispatch源已经被取消
  18. long dispatch_source_testcancel(dispatch_source_t source);
  19. //dispatch源取消时调用的block,一般用于关闭文件或socket等,释放相关资源
  20. void dispatch_source_set_cancel_handler(dispatch_source_t source, dispatch_block_t cancel_handler);
  21. //可用于设置dispatch源启动时调用block,调用完成后即释放这个block。也可在dispatch源运行当中随时调用这个函数。
  22. void dispatch_source_set_registration_handler(dispatch_source_t source, dispatch_block_t registration_handler);

12.4 timer的封装

打开SparkTimer.h文件,写入以下代码:

  1. #import <Foundation/Foundation.h>
  2. @class SparkTimer;
  3. typedef void (^TimerBlock)(SparkTimer * _Nonnull timer);
  4. @interface SparkTimer : NSObject
  5. + (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti target:(id)aTarget selector:(SEL)aSelector userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately;
  6. + (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately timerBlock:(TimerBlock)block;
  7. - (void)invalidate;
  8. @property (nonatomic, readonly, getter=isValid) BOOL valid;
  9. @property (nonatomic, nullable, readonly, retain) id userInfo;
  10. @end

打开SparkTimer.m文件,写入以下代码:

  1. #import "SparkTimer.h"
  2. @interface SparkTimer ()
  3. @property (nonatomic, assign) NSTimeInterval interval;
  4. @property (nonatomic, nullable, readwrite, retain) id userInfo;
  5. @property (nonatomic, assign) BOOL repeats;
  6. @property (nonatomic, assign) BOOL immediately;
  7. @property (nonatomic, strong) dispatch_source_t timer;
  8. @property (nonatomic, readwrite, getter=isValid) BOOL valid;
  9. @end
  10. @implementation SparkTimer
  11. + (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti target:(id)aTarget selector:(SEL)aSelector userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately{
  12. return [[SparkTimer alloc] initWithInterval:ti target:aTarget selector:aSelector userInfo:userInfo repeats:yesOrNo immediately:isImmediately timerBlock:nil isBlock:NO];
  13. }
  14. + (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately timerBlock:(TimerBlock)block{
  15. return [[SparkTimer alloc] initWithInterval:ti target:nil selector:nil userInfo:userInfo repeats:yesOrNo immediately:isImmediately timerBlock:block isBlock:YES];
  16. }
  17. - (instancetype)initWithInterval:(NSTimeInterval)ti target:(id)aTarget selector:(SEL)aSelector userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately timerBlock:(TimerBlock)block isBlock:(BOOL)isBlock
  18. {
  19. self = [super init];
  20. if (self) {
  21. _interval = ti;
  22. _userInfo = userInfo;
  23. _repeats = yesOrNo;
  24. _immediately = isImmediately;
  25. _valid = NO;
  26. @weakify(self)
  27. [self createTimer:^{
  28. @strongify(self)
  29. if(!isBlock){
  30. [self callOutWithTarget:aTarget selector:aSelector];
  31. return;
  32. }
  33. [self callOutWithTimerBlock:block];
  34. }];
  35. }
  36. return self;
  37. }
  38. - (void)callOutWithTarget:(id)aTarget selector:(SEL)aSelector{
  39. if(!aTarget || !aSelector){
  40. [self invalidate];
  41. return;
  42. }
  43. [aTarget performSelector:aSelector withObject:self];
  44. [self checkRepeats];
  45. }
  46. - (void)callOutWithTimerBlock:(TimerBlock)block{
  47. if(!block){
  48. [self invalidate];
  49. return;
  50. }
  51. block(self);
  52. [self checkRepeats];
  53. }
  54. - (void)checkRepeats{
  55. if(self.repeats){
  56. return;
  57. }
  58. [self invalidate];
  59. }
  60. - (void)createTimer:(void(^)(void))block{
  61. _valid = YES;
  62. //1.创建队列
  63. dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
  64. //2.创建timer
  65. _timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
  66. dispatch_time_t start;
  67. if(self.immediately){
  68. start = DISPATCH_TIME_NOW;
  69. }
  70. else{
  71. start = dispatch_time(DISPATCH_TIME_NOW, self.interval * NSEC_PER_SEC);
  72. }
  73. //3.设置timer首次执行时间,间隔,精确度
  74. dispatch_source_set_timer(_timer, start, self.interval * NSEC_PER_SEC, 0.1 * NSEC_PER_SEC);
  75. //4.设置timer事件回调
  76. dispatch_source_set_event_handler(_timer, ^{
  77. NSLog(@"计时");
  78. block();
  79. });
  80. //5.默认是挂起状态,需要手动激活
  81. dispatch_resume(_timer);
  82. }
  83. - (void)invalidate{
  84. if(!self.isValid){
  85. return;
  86. }
  87. _valid = NO;
  88. dispatch_source_cancel(_timer);
  89. }
  90. - (id)userInfo{
  91. return _userInfo;
  92. }
  93. @end

总结

创建队列:

  • 串行队列的dq_atomic_flags1
  • dq_serialnum:标记队列类型
  • 队列也是对象,也有isa指向
  • 队列通过模板创建

同步函数:

  • 当同步函数加入到串行队列中,同一线程在等待时又被执行,就会形成相互等待的局面,造成死锁的异常

异步函数:

  • 通过dx_push递归,会重定向到根队列,然后通过pthread_creat创建线程,最后通过dx_invoke执行block回调

单例模式:

  • 底层包含锁的处理,线程安全

线程池:

  • 使用pthread_create函数,创建线程
  • 最大线程数是未知的
  • 如果按照内核态1GB满载,最小堆栈大小为16KB计算,最大线程数可开辟64 * 1024。按照辅助线程512KB计算,最大线程数可开辟2048

栅栏函数:

  • 异步栅栏函数阻塞的是队列,而且必须是自定义的并发队列,不影响主线程任务的执行
  • 同步栅栏函数阻塞的是线程,且是主线程,会影响主线程其他任务的执行

信号量:

  • dispatch_semaphore_create:初始化信号量,设置GCD最大并发数,必须>= 0
  • dispatch_semaphore_wait:等待,对信号量减1,相当于加锁
    • 若信号量>= 0,直接返回0,执行wait之后的代码
    • 若信号量< 0,将阻塞当前线程,进入_dispatch_semaphore_wait_slow函数
  • dispatch_semaphore_signal:释放,对信号量加1,相当于解锁

调度组:

  • dispatch_group_enterdispatch_group_leave必须成对使用
  • dispatch_group_enter:进行减1操作,执行后为-1
  • dispatch_group_leave:进行加1操作,执行后为0
  • dispatch_group_notify:判断状态为0,调用_dispatch_group_wake函数
    • 通知函数不需要一直等待,因为在dispatch_group_leave中也有_dispatch_group_wake函数的调用,同样可以唤醒block任务
  • dispatch_group_async:内部封装了dispatch_group_enterdispatch_group_leave