1. 主队列分析
主队列是GCD
提供的特殊的串行队列,在libdispatch
源码中,找到主队列的定义,找到它是串行队列的依据
1.1 找到主队列的类型实现
1.1 方式一
搜索dispatch_get_main_queue
关键字
DISPATCH_INLINE DISPATCH_ALWAYS_INLINE DISPATCH_CONST DISPATCH_NOTHROW
dispatch_queue_main_t
dispatch_get_main_queue(void)
{
return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
}
参数2
为队列类型,在源码中搜索_dispatch_main_q
,找到的结果非常多,这时可以尝试搜索_dispatch_main_q =
,如果是赋值操作,可以缩小结果的范围
搜索_dispatch_main_q =
,找到实现代码
struct dispatch_queue_static_s _dispatch_main_q = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
.do_targetq = _dispatch_get_default_queue(true),
#endif
.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
DISPATCH_QUEUE_ROLE_BASE_ANON,
.dq_label = "com.apple.main-thread",
.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
.dq_serialnum = 1,
};
1.2 方式二
在项目中,创建主队列,使用lldb
,可以输出它的结构,其中包含队列标识符
dispatch_queue_t queue = dispatch_get_main_queue();
-------------------------
(lldb) po queue
<OS_dispatch_queue_main: com.apple.main-thread[0x104a6cc80] = { xref = -2147483648, ref = -2147483648, sref = 1, target = com.apple.root.default-qos.overcommit[0x104a6d100], width = 0x1, state = 0x001ffe9000000300, dirty, in-flight = 0, thread = 0x303 }>
来到libdispatch
源码中,直接搜索com.apple.main-thread
,也可以直接定位到它的实现代码
1.2 主队列是串行队列的依据
实现代码中,主队列的dq_atomic_flags
为DQF_WIDTH(1)
,我们可以通过找到串行队列的dq_atomic_flags
进行对比,如果一致,即可证明主队列就是一个串行队列
在源码中,搜索dispatch_queue_create
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_lane_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
进入_dispatch_lane_create_with_target
函数,找到创建队列的核心代码
- 创建队列,开辟空间
- 调用构造函数,如果
dqai_concurrent
非真,传入1
进入_dispatch_queue_init
函数,找到参数3
如果是串行队列,传入的width
为1
,dqf
为DQF_WIDTH(1)
,和主队列dq_atomic_flags
属性设置的值一致,所以主队列就是一个串行队列
1.3 dq_serialnum
的作用
有一些说法,如果dq_serialnum
设置为1
,就是串行队列,例如:主队列的设置
在源码中,搜索dq_serialnum
,可以找到很多赋值的场景
进入DISPATCH_QUEUE_SERIAL_NUMBER_WLF
的定义
同时找到了dq_serialnum
的相关注释
跳过零
主队列:
1
mgr_q
:2
mgr_root_q
:3
全局队列:
4~15
自定义队列:
17
所以dq_serialnum
并不是用来标记串行和并发队列的
1.4 自定义队列的dq_serialnum
搜索DISPATCH_QUEUE_SERIAL_NUMBER_INIT
,找到被赋值的代码
unsigned long volatile _dispatch_queue_serial_numbers =
DISPATCH_QUEUE_SERIAL_NUMBER_INIT;
搜索_dispatch_queue_serial_numbers
找到宏定义
#define os_atomic_inc_orig(p, m) \
os_atomic_add_orig((p), 1, m)
#define os_atomic_add_orig(p, v, m) \
_os_atomic_c11_op_orig((p), (v), m, add, +)
#define _os_atomic_c11_op_orig(p, v, m, o, op) \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), v, \
memory_order_##m)
atomic_fetch_##o##_explicit
中的##o##
为占位,替换的o
传入的值为add
替换后为
atomic_fetch_add_explicit
,来自C++11
的原子操作函数宏的作用,传入
17
,之后每次+1
2. 全局队列分析
在lldb
中,输出全局队列的标识符
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
-------------------------
(lldb) po queue
<OS_dispatch_queue_global: com.apple.root.default-qos[0x102b85080] = { xref = -2147483648, ref = -2147483648, sref = 1, target = [0x0], width = 0xfff, state = 0x0060000000000000, in-barrier}>
来到libdispatch
源码中,搜索com.apple.root.default-qos
struct dispatch_queue_global_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
[_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), \
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
.do_ctxt = _dispatch_root_queue_ctxt(_DISPATCH_ROOT_QUEUE_IDX(n, flags)), \
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
.dq_priority = flags | ((flags & DISPATCH_PRIORITY_FLAG_FALLBACK) ? \
_dispatch_priority_make_fallback(DISPATCH_QOS_##n) : \
_dispatch_priority_make(DISPATCH_QOS_##n, 0)), \
__VA_ARGS__ \
}
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
.dq_label = "com.apple.root.maintenance-qos",
.dq_serialnum = 4,
),
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.maintenance-qos.overcommit",
.dq_serialnum = 5,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
.dq_label = "com.apple.root.background-qos",
.dq_serialnum = 6,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.background-qos.overcommit",
.dq_serialnum = 7,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
.dq_label = "com.apple.root.utility-qos",
.dq_serialnum = 8,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
.dq_label = "com.apple.root.user-initiated-qos",
.dq_serialnum = 12,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-initiated-qos.overcommit",
.dq_serialnum = 13,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
.dq_label = "com.apple.root.user-interactive-qos",
.dq_serialnum = 14,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-interactive-qos.overcommit",
.dq_serialnum = 15,
),
};
_dispatch_root_queues
为数组类型,默认将4~15
的全局队列全部初始化,获取时通过不同类型,获取不同的队列和主队列的类型差异
主队列:
dispatch_queue_static_s
全局队列:
dispatch_queue_global_s
3. 创建队列
搜索dispatch_queue_create
,进入_dispatch_lane_create_with_target
函数
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// 1、创建dqai
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
// 2、规范化参数,例如:qos, overcommit, tq
dispatch_qos_t qos = dqai.dqai_qos;
...
// 3、根据队列类型,创建vtable
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
legacy = false;
}
}
const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
...
// 4、开辟空间,初始化队列
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
...
// 5、根据不同类型,通过模板创建队列
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq;
}
2.1 创建dqai
进入_dispatch_queue_attr_to_info
函数
创建空的
dqai
,默认为串行队列。如果传入的参数为空,返回空dqai
传入的参数有值,将队列信息保存到
dqai
中
2.2 规范化参数
- 设置优先级、服务质量等参数
2.3 创建vtable
通过DISPATCH_VTABLE
,将传入的队列类型拼接vtable
找到DISPATCH_VTABLE
宏的定义
#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
#define DISPATCH_OBJC_CLASS(name) (&DISPATCH_CLASS_SYMBOL(name))
#define DISPATCH_CLASS_SYMBOL(name) _dispatch_##name##_vtable
2.4 初始化队列
使用_dispatch_object_alloc
函数开辟空间,进入_os_object_alloc_realized
函数
- 包含
isa
指向,说明队列也是一个对象
使用_dispatch_queue_init
函数初始化队列
- 队列类型为
dispatch_queue_t
,为队列的成员变量赋值
2.5 通过模板创建队列
使用_dispatch_introspection_queue_create
函数,传入初始化后的dq
流程:_dispatch_introspection_queue_create
→_dispatch_introspection_queue_create_hook
→dispatch_introspection_queue_get_info
→_dispatch_introspection_lane_get_info
4. 继承链
使用_dispatch_lane_create_with_target
创建队列,返回dispatch_queue_t
类型
dispatch_queue_t
类型来自于宏定义
DISPATCH_DECL(dispatch_queue);
#define DISPATCH_DECL(name) OS_OBJECT_DECL_SUBCLASS(name, dispatch_object)
#define OS_OBJECT_DECL_SUBCLASS(name, super) \
OS_OBJECT_DECL_IMPL(name, NSObject, <OS_OBJECT_CLASS(super)>)
#define OS_OBJECT_DECL_IMPL(name, adhere, ...) \
OS_OBJECT_DECL_PROTOCOL(name, __VA_ARGS__) \
typedef adhere<OS_OBJECT_CLASS(name)> \
* OS_OBJC_INDEPENDENT_CLASS name##_t
#define OS_OBJECT_DECL_PROTOCOL(name, ...) \
@protocol OS_OBJECT_CLASS(name) __VA_ARGS__ \
@end
#define OS_OBJECT_CLASS(name) OS_##name
第一步:使用
OS_OBJECT_CLASS
在前面拼接OS_
,例如:OS_dispatch_queue
第二步:使用
OS_OBJECT_DECL_PROTOCOL
在结尾拼接_t
,例如:OS_dispatch_queue_t
第三步:使用泛型接收,
typedef adhere<OS_dispatch_queue> * OS_OBJC_INDEPENDENT_CLASS OS_dispatch_queue_t
DISPATCH_DECL
宏在dispatch_object_s
结构体中的定义
#define DISPATCH_DECL(name) \
typedef struct name##_s : public dispatch_object_s {} *name##_t
typedef struct dispatch_queue_s : public dispatch_object_s {} *dispatch_queue_t
dispatch_queue_t
的本质是dispatch_queue_s
,dispatch_queue_s
继承自dispatch_object_s
dispatch_object_s
来自于dispatch_object_t
的联合体
typedef union {
struct _os_object_s *_os_obj;
struct dispatch_object_s *_do;
struct dispatch_queue_s *_dq;
struct dispatch_queue_attr_s *_dqa;
struct dispatch_group_s *_dg;
struct dispatch_source_s *_ds;
struct dispatch_channel_s *_dch;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_semaphore_s *_dsema;
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
- 所以最终的根类是
dispatch_object_t
找到dispatch_queue_s
的结构
struct dispatch_queue_s {
DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
/* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN;
找到结构体内嵌套的DISPATCH_QUEUE_CLASS_HEADER
定义
#define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
_DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
/* LP64 global queue cacheline boundary */ \
unsigned long dq_serialnum; \
const char *dq_label; \
DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
const uint16_t dq_width, \
const uint16_t __dq_opaque2 \
); \
dispatch_priority_t dq_priority; \
union { \
struct dispatch_queue_specific_head_s *dq_specific_head; \
struct dispatch_source_refs_s *ds_refs; \
struct dispatch_timer_source_refs_s *ds_timer_refs; \
struct dispatch_mach_recv_refs_s *dm_recv_refs; \
struct dispatch_channel_callbacks_s const *dch_callbacks; \
}; \
int volatile dq_sref_cnt
#define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
DISPATCH_OBJECT_HEADER(x); \
__pointer_sized_field__; \
DISPATCH_UNION_LE(uint64_t volatile dq_state, \
dispatch_lock dq_state_lock, \
uint32_t dq_state_bits \
)
#define DISPATCH_OBJECT_HEADER(x) \
struct dispatch_object_s _as_do[0]; \
_DISPATCH_OBJECT_HEADER(x)
DISPATCH_OBJECT_HEADER
中包含dispatch_object_s
结构体和_DISPATCH_OBJECT_HEADER
找到_DISPATCH_OBJECT_HEADER
的定义
#define _DISPATCH_OBJECT_HEADER(x) \
struct _os_object_s _as_os_obj[0]; \
OS_OBJECT_STRUCT_HEADER(dispatch_##x); \
struct dispatch_##x##_s *volatile do_next; \
struct dispatch_queue_s *do_targetq; \
void *do_ctxt; \
union { \
dispatch_function_t DISPATCH_FUNCTION_POINTER do_finalizer; \
void *do_introspection_ctxt; \
}
_DISPATCH_OBJECT_HEADER
中包含_os_object_s
结构体和OS_OBJECT_STRUCT_HEADER
找到OS_OBJECT_STRUCT_HEADER
的定义
#define OS_OBJECT_STRUCT_HEADER(x) \
_OS_OBJECT_HEADER(\
const struct x##_vtable_s *__ptrauth_objc_isa_pointer do_vtable, \
do_ref_cnt, \
do_xref_cnt)
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
isa; /* must be pointer-sized and use __ptrauth_objc_isa_pointer */ \
int volatile ref_cnt; \
int volatile xref_cnt
- 包含
isa
成员变量
所以dispatch_queue_s
结构体还伪继承于_os_object_s
,并包含isa
成员变量,说明队列也是一个对象
5. 同步函数
5.1 全局队列
同步函数配合全局队列
dispatch_sync(dispatch_get_global_queue(0, 0), ^{
NSLog(@"Block");
});
在libdispatch
源码中,找到dispatch_sync
函数的实现
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
- 传入的
work
参数就是任务的block
,只需要一直追寻work
参数就能找到调用的代码 - 传入
_dispatch_sync_f
函数的参数3
,对work
进行包装
找到_dispatch_Block_invoke
的定义
#define _dispatch_Block_invoke(bb) \
((dispatch_function_t)((struct Block_layout *)bb)->invoke)
- 将
block
下的invoke
强转为dispatch_function_t
结构体
进入_dispatch_sync_f
函数
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
ctxt
:block
func
:invoke
进入_dispatch_sync_f_inline
函数
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
- 函数中出现了复杂的逻辑,可以使用符号断点,找到正确的逻辑分支
在项目中,将所有函数设置符号断点
- 进入
_dispatch_sync_f_slow
函数
进入_dispatch_sync_f_slow
函数
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
- 同样是复杂逻辑,继续使用符号断点
在项目中,将所有可能的条件分支,全部设置符号断点
- 进入
_dispatch_sync_function_invoke
函数
进入_dispatch_sync_function_invoke
函数
DISPATCH_NOINLINE
static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
}
进入_dispatch_sync_function_invoke_inline
函数
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
_dispatch_thread_frame_push
:任务加入队列_dispatch_client_callout
:执行任务_dispatch_thread_frame_pop
:任务移除队列
进入_dispatch_client_callout
函数
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
_dispatch_get_tsd_base();
void *u = _dispatch_get_unwind_tsd();
if (likely(!u)) return f(ctxt);
_dispatch_set_unwind_tsd(NULL);
f(ctxt);
_dispatch_free_unwind_tsd();
_dispatch_set_unwind_tsd(u);
}
- 调用
f(ctxt)
就是内部自动调用block
任务的代码
通过lldb
进行验证
- 和源码中的逻辑完全一致
同步函数顺序被执行,因为_dispatch_sync_function_invoke_inline
中,三个函数的顺序调用:
_dispatch_thread_frame_push
:任务加入队列_dispatch_client_callout
:执行任务_dispatch_thread_frame_pop
:任务移除队列
当同步函数加入串行队列,在_dispatch_sync_f_inline
函数中,使用_dispatch_barrier_sync_f
同步栅栏函数
5.2 并发队列
同步函数配合并发队列
dispatch_queue_t queue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
dispatch_sync(queue, ^{
NSLog(@"block---:%@",[NSThread currentThread]);
});
进入_dispatch_sync_f_inline
函数,并发队列触发的代码分支和全局队列有所不同
进入_dispatch_sync_invoke_and_complete
函数
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_lane_non_barrier_complete(dq, 0);
}
- 最终调用的还是
_dispatch_sync_function_invoke_inline
函数
函数中的最后一个入参
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc)
#define DISPATCH_TRACE_ARG(arg)
#define DISPATCH_TRACE_ARG(arg) , arg
- 将
,
和arg
封装到一起,所以外部不用加额外的逗号
5.3 死锁
- 当出现死锁异常时,调用
_dispatch_sync_f_slow
函数 - 然后调用
__DISPATCH_WAIT_FOR_QUEUE__
,抛出异常
探索流程:dispatch_sync
→_dispatch_sync_f
→_dispatch_sync_f_inline
→_dispatch_sync_f_inline
进入_dispatch_sync_f_inline
函数
- 当前为同步函数,并且加入串行队列,调用
_dispatch_barrier_sync_f
函数 - 同步函数加入串行队列的底层,使用同步栅栏函数
进入_dispatch_barrier_sync_f
函数
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
进入_dispatch_barrier_sync_f_inline
函数
- 出现死锁异常时,已知的
_dispatch_sync_f_slow
函数
进入_dispatch_sync_f_slow
函数
- 内部调用
__DISPATCH_WAIT_FOR_QUEUE__
抛出异常
进入__DISPATCH_WAIT_FOR_QUEUE__
函数
- 当
_dq_state_drain_locked_by
函数返回真,抛出异常 - 传入两个参数,
dq_state
为队列状态,dsc_waiter
为线程的tid
进入_dq_state_drain_locked_by
函数
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
进入_dispatch_lock_is_locked_by
函数
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
- 找到判断依据
找到DLOCK_OWNER_MASK
定义
#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc)
DLOCK_OWNER_MASK
定位为很大值,只要一个非零的值和它进行&
运算,结果一定不为领lock_value
和tid
进行按位异或,它们运算结果为零只有一种可能,就是二者的值相同
当同步函数加入到串行队列中,同一线程在等待时又被执行,就会形成相互等待的局面,造成死锁的异常
6. 异步函数
在libdispatch
源码中,找到dispatch_async
函数的实现
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
进入_dispatch_continuation_init
函数
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
void *ctxt = _dispatch_Block_copy(work);
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
dispatch_function_t func = _dispatch_Block_invoke(work);
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_release;
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
进入_dispatch_continuation_init_f
函数
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
dc->dc_func = f;
dc->dc_ctxt = ctxt;
// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
// should not be propagated, only taken from the handler if it has one
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}
_dispatch_continuation_voucher_set(dc, flags);
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
- 对
block
任务进行封装
进入_dispatch_continuation_priority_set
函数
static inline dispatch_qos_t
_dispatch_continuation_priority_set(dispatch_continuation_t dc,
dispatch_queue_class_t dqu,
pthread_priority_t pp, dispatch_block_flags_t flags)
{
dispatch_qos_t qos = DISPATCH_QOS_UNSPECIFIED;
#if HAVE_PTHREAD_WORKQUEUE_QOS
dispatch_queue_t dq = dqu._dq;
if (likely(pp)) {
bool enforce = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
bool is_floor = (dq->dq_priority & DISPATCH_PRIORITY_FLAG_FLOOR);
bool dq_has_qos = (dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK);
if (enforce) {
pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
qos = _dispatch_qos_from_pp_unsafe(pp);
} else if (!is_floor && dq_has_qos) {
pp = 0;
} else {
qos = _dispatch_qos_from_pp_unsafe(pp);
}
}
dc->dc_priority = pp;
#else
(void)dc; (void)dqu; (void)pp; (void)flags;
#endif
return qos;
}
- 对任务优先级进行封装
异步函数的执行,优先级是衡量标准的其中一项。而且任务根据CPU
的调度情况异步执行,所以一定是无序的,需要对其进行封装
6.2 并发队列
异步函数配合并发队列
dispatch_queue_t queue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(queue, ^{
NSLog(@"block---:%@",[NSThread currentThread]);
});
进入_dispatch_continuation_async
函数
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
找到dx_push
的宏定义
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
- 此时我们关注的是
参数3
的调用,可以暂时无视dx_vtable
,继续跟踪dq_push
dq_push
属于函数的赋值点,根据队列类型的不同赋值也不同
查看并发队列的赋值
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
进入_dispatch_lane_concurrent_push
函数
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
使用符号断点,先进入_dispatch_continuation_redirect_push
函数
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
dx_push(dq, dou, qos);
}
再次调用dx_push
,此时队列类型为queue_pthread_root
,所以dx_push
不是_dispatch_lane_concurrent_push
函数,而是对应了_dispatch_root_queue_push
函数,类似于调用了父类的方法
通过符号断点配合汇编代码查看流程:
_dispatch_root_queue_push
→_dispatch_root_queue_push_override
→_dispatch_root_queue_poke
→_dispatch_root_queue_poke_slow
进入_dispatch_root_queue_poke_slow
函数
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
...
_dispatch_root_queues_init();
...
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
...
}
- 通过
_dispatch_root_queues_init
注册异步任务执行的回调 - 通过
do...while
循环创建线程,使用pthread_create
函数
使用lldb
进行反推
- 由系统的
_pthread_wqthread
调用libdispatch
的_dispatch_worker_thread2
函数
源码中正向推导流程:
_dispatch_root_queues_init
→_dispatch_root_queues_init_once
→_dispatch_worker_thread2
对_dispatch_worker_thread2
进行赋值,封装给pthread
的API
调用
- 异步线程被
CPU
调度,系统在合适的时机,通过libsystem_pthread.dylib
的_pthread_wqthread
函数,调用libdispatch.dylib
的_dispatch_worker_thread2
函数,最终执行异步函数的任务
异步任务回调流程:
_dispatch_worker_thread2
→_dispatch_root_queue_drain
→_dispatch_async_redirect_invoke
→_dispatch_continuation_pop
→_dispatch_client_callout
→_dispatch_call_block_and_release
异步函数配合并发队列的逻辑:
首先对任务和优先级进行封装
多次调用dx_push,最终找到
_dispatch_root_queue_push
通过
_dispatch_root_queues_init
注册异步任务执行的回调通过
do...while
循环创建线程,使用pthread_create
函数异步线程被
CPU
调度,系统在合适的时机,通过libsystem_pthread.dylib
调用_dispatch_worker_thread2
函数,执行回调
6.3 全局队列
异步函数配全局队列
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_async(queue, ^{
NSLog(@"block---:%@",[NSThread currentThread]);
});
代码逻辑:dispatch_async
→_dispatch_continuation_async
→_dispatch_continuation_async
→dx_push
此时调用的dx_push
,和并发队列的赋值不同
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
进入_dispatch_root_queue_push
函数流程:
_dispatch_root_queue_push
→_dispatch_root_queue_push_override
→_dispatch_root_queue_push_inline
→_dispatch_root_queue_poke
→_dispatch_root_queue_poke_slow
进入_dispatch_root_queue_poke_slow
函数:
- 通过
_dispatch_root_queues_init
注册异步任务执行的回调 - 通过
do...while
循环创建线程,使用pthread_create
函数
使用lldb
进行反推
异步任务回调流程:
_dispatch_worker_thread2
→_dispatch_root_queue_drain
→_dispatch_queue_override_invoke
→_dispatch_client_callout
→_dispatch_call_block_and_release
7. 单例模式
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
NSLog(@"block---:%@",[NSThread currentThread]);
});
来到源码,找到dispatch_once
函数的实现
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
进入dispatch_once_f
函数
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
- 将
val
强转为dispatch_once_gate_t
类型,类似于栅栏的使用
三个条件分支:
如果执行完成,直接返回
如果第一次,执行
_dispatch_once_callout
函数如果正在执行,进入
_dispatch_once_wait
等待
7.1 锁的处理
进入_dispatch_once_gate_tryenter
函数
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
- 进行原子锁的处理,防止多线程
7.2 执行任务
进入_dispatch_once_callout
函数
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
}
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
_dispatch_get_tsd_base();
void *u = _dispatch_get_unwind_tsd();
if (likely(!u)) return f(ctxt);
_dispatch_set_unwind_tsd(NULL);
f(ctxt);
_dispatch_free_unwind_tsd();
_dispatch_set_unwind_tsd(u);
}
- 通过
f(ctxt)
执行任务的回调
进入_dispatch_once_gate_broadcast
函数
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
- 锁的处理,并标记为完成
单例模式的原理:
调用
dispatch_once
函数,传入onceToken
和block
。其中onceToken
为静态变量,具有唯一性,在底层被强转为dispatch_once_gate_t
类型的变量l
,l
通过os_atomic_load
函数获取底层原子封装性的关联,得到变量v
,通过v
来查询任务的状态,如果此时v
等于DLOCK_ONCE_DONE
,说明任务已经处理过一次了,直接return
如果任务首次执行,将任务进行加锁,任务状态置为
DLOCK_ONCE_UNLOCK
,目的保证线程安全。加锁后进行block
回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为DLOCK_ONCE_DONE
,在下次进来时,就不会在执行,会直接返回如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的
8. 线程池
8.1 创建线程
异步函数在_dispatch_root_queue_poke_slow
中,如果是全局队列,使用_pthread_workqueue_addthreads
函数创建并执行
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
如果是普通队列,使用do...while
进行线程池的创建,在创建之前,还要对线程池的状态进行判断
int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
- 判断
dgq_thread_pool_size
,源码中标记为1
dgq_thread_pool_size
会根据逻辑自增,加到最大值为止remaining
和floor
为入参,传入1
和0
- 计算
can_request
线程数,如果t_count
小于floor
返回0
,否则返回t_count
减去floor
的差值 - 如果
remaining
线程数大于can_request
,pthread
线程池减少请求,以can_request
线程数为准 - 如果
remaining
为0
,表示根队列的pthread
线程池已满
使用pthread_create
函数,创建线程
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
8.2 最大线程数
线程池最大线程数的设定
int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT;
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
- 最大线程数设置
255
,但实际程序中开辟的线程数,不一定能达到这个最大值
官方文档中,辅助线程为512KB
,辅助线程允许的最小堆栈大小为16KB
,并且堆栈大小必须是4KB
的倍数
程序启动,系统给出的虚拟内存4GB
,用户态占3GB
,内核态占1GB
。但内核态的1GB
并不能全部用来开辟线程,所以最大线程数是未知的
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
unsigned dwStackSize = 64 * 1024;
#endif
uintptr_t hThread = 0;
while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
if (errno != EAGAIN) {
(void)dispatch_assume(hThread);
}
_dispatch_temporary_resource_shortage();
}
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
}
#endif
CloseHandle((HANDLE)hThread);
} while (--remaining);
- 按照内核态
1GB
满载,最小堆栈大小为16KB
计算,最大线程数可开辟64 * 1024
。按照辅助线程512KB
计算,最大线程数可开辟2048
9. 栅栏函数
iOS
中有两种栅栏函数,都是用于控制任务的执⾏顺序
dispatch_barrier_async
:异步栅栏函数,前面的任务执行完毕才会来到这里dispatch_barrier_sync
:同步栅栏函数,和异步栅栏函数的作用相同,但是同步栅栏函数会堵塞线程,影响后面的任务执行
使用栅栏函数的注意事项:
栅栏函数只能控制同一并发队列
同步栅栏函数添加队列,当前线程会被锁死,直到栅栏之前的任务和栅栏本身的任务执行完毕,当前线程才会继续执行
全局并发队列不支持栅栏函数,因为可能会干扰系统级的任务执行
如果是串行队列,使用栅栏函数的作用等同于一个同步函数,没有任何意义
栅栏函数还可用于线程安全,类似于锁的作用
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i<10000; i++) {
dispatch_async(concurrentQueue, ^{
dispatch_barrier_async(concurrentQueue, ^{
[self.mArray addObject:[NSString stringWithFormat:@"%d",i]];
});
// @synchronized (self) {
// [self.mArray addObject:[NSString stringWithFormat:@"%d",i]];
// }
});
}
- 此案例,如果不加栅栏函数,也不加互斥锁,使用并发队列多线程对同一数组进行
addObject
,很有可能会发生崩溃 - 因为数据的写入,本质是对旧值的
release
,对新值的retain
- 当数据不断
release
和retain
时,多线程会造成数据还没有retain
完毕,就开始进行release
,相当于加入空数据,进行release
9.1 同步栅栏函数分析
源码中,找到dispatch_barrier_sync
函数的实现
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}}
_dispatch_barrier_sync_f
→_dispatch_barrier_sync_f_inline
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
- 逻辑中存在进入
_dispatch_sync_f_slow
函数的代码,证明同步栅栏函数也可能出现死锁的情况
_dispatch_sync_recurse
→_dispatch_sync_invoke_and_complete_recurse
→_dispatch_sync_complete_recurse
static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
uintptr_t dc_flags)
{
bool barrier = (dc_flags & DC_FLAG_BARRIER);
do {
if (dq == stop_dq) return;
if (barrier) {
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
} else {
_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
}
dq = dq->do_targetq;
barrier = (dq->dq_width == 1);
} while (unlikely(dq->do_targetq));
}
- 判断
targetq
,存在栅栏调用dx_wakeup
等待 - 否则,调用
_dispatch_lane_non_barrier_complete
函数
并发队列的dx_wakeup
实现
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
进入_dispatch_lane_wakeup
函数
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
- 针对栅栏函数进行判断,进入
_dispatch_lane_barrier_complete
函数
进入_dispatch_lane_barrier_complete
函数
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
dispatch_lane_t dq = dqu._dl;
if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
if (_dispatch_object_is_waiter(dc)) {
return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
}
} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
return _dispatch_lane_drain_non_barriers(dq, dc, flags);
}
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
- 如果是串行队列,栅栏相当于同步函数,调用
_dispatch_lane_drain_barrier_waiter
函数 - 如果是并发队列,调用
_dispatch_lane_drain_non_barriers
函数,进行栅栏相关处理 - 栅栏之前的任务全部完成,调用
_dispatch_lane_class_barrier_complete
函数
9.2 全局队列中的栅栏函数
全局队列的dx_wakeup
实现
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
进入_dispatch_root_queue_wakeup
函数
void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
DISPATCH_INTERNAL_CRASH(dq->dq_priority,
"Don't try to wake up or override a root queue");
}
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
}
- 全局队列中,没有对栅栏函数的任何判断和处理。所以,栅栏函数在全局队列中,和普通的同步或异步函数别无二致
10. 信号量
信号量可以让异步任务同步执行,可以当锁使用,并且能够控制GCD
最大并发数
dispatch_semaphore_t sem = dispatch_semaphore_create(0);
for (int i = 0; i < 10; i++) {
dispatch_async(queue, ^{
sleep(1);
NSLog(@"当前 - %d, 线程 - %@", i, [NSThread currentThread]);
dispatch_semaphore_signal(sem);
});
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
}
dispatch_semaphore_create
:初始化信号量,设置GCD
最大并发数,必须>= 0
dispatch_semaphore_wait
:等待,对信号量减1
,相当于加锁dispatch_semaphore_signal
:释放,对信号量加1
,相当于解锁
10.1 创建
进入dispatch_semaphore_create
函数
dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
- 初始化信号量,设置
GCD
最大并发数 - 最大并发数必须
>= 0
10.2 等待
进入dispatch_semaphore_wait
函数
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
os_atomic_dec2o
宏,进行减1
操作- 若信号量
>= 0
,直接返回0
,执行wait
之后的代码 - 若信号量
< 0
,将阻塞当前线程,进入_dispatch_semaphore_wait_slow
函数
进入_dispatch_semaphore_wait_slow
函数
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
- 根据
timeout
的值进行不同的逻辑处理
如果为DISPATCH_TIME_FOREVER
类型,进入_dispatch_sema4_wait
函数
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
int ret = 0;
do {
ret = sem_wait(sema);
} while (ret == -1 && errno == EINTR);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
- 核心代码为
do...while
,通过循环使得下面的代码无法执行
10.3 释放
进入dispatch_semaphore_signal
函数
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
os_atomic_inc2o
宏,进行加1
操作- 若信号量
> 0
,直接返回0
,继续执行后续代码 - 若信号量等于
LONG_MIN
,抛出异常。这种情况表示wait
操作过多,二者之间无法匹配。之后会调用_dispatch_semaphore_signal_slow
函数,进入延迟等待
11. 调度组
调度组最直接的作⽤:控制任务执⾏顺序
dispatch_group_create
:创建组dispatch_group_async
:进组任务dispatch_group_notify
:进组任务执行完毕通知dispatch_group_wait
:进组任务执行等待时间dispatch_group_enter
:进组dispatch_group_leave
:出组
使用进组任务
- (void)testGCD{
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_async(group, queue, ^{
NSLog(@"接口1");
});
dispatch_group_async(group, queue, ^{
NSLog(@"接口2");
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"刷新");
});
}
使用进组、出组
- (void)testGCD{
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"接口1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"接口2");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"刷新");
});
}
dispatch_group_enter
和dispatch_group_leave
必须成对使用,否则会一直等待或出现异常
11.1 创建组
进入dispatch_group_create
函数
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
进入_dispatch_group_create_with_count
函数
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
- 创建
dispatch_group_t
结构体,参数n
默认传入0
11.2 进组
进入dispatch_group_enter
函数
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
- 使用
os_atomic_sub_orig2o
宏,对dg_bits
进行减1
操作 old_bits
只可能是-1
和0
两种可能old_bits
和DISPATCH_GROUP_VALUE_MASK
进行&
运算,将结果赋值给old_value
- 如果
old_bits
为0
,old_value
为0
,调用_dispatch_retain
函数 - 如果
old_bits
为-1
,old_value
为DISPATCH_GROUP_VALUE_MASK
,表示进组和出组函数使用不平衡,报出异常
- 如果
11.3 出组
进入dispatch_group_leave
函数
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
- 使用
os_atomic_add_orig2o
宏,进行加1
操作 &
运算后的旧值等于DISPATCH_GROUP_VALUE_1
,等待do...while
停止循环,调用_dispatch_group_wake
函数DISPATCH_GROUP_VALUE_1
等同于DISPATCH_GROUP_VALUE_MASK
- 如果旧值为
0
,表示进组和出组函数使用不平衡,报出异常
进入_dispatch_group_wake
函数
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
- 函数的作用,唤醒
dispatch_group_notify
函数 - 核心代码在
do...while
循环中,调用_dispatch_continuation_async
函数
进入_dispatch_continuation_async
函数
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
- 最终调用
dx_push
11.4 通知
进入dispatch_group_notify
函数
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
- 判断状态为
0
,调用_dispatch_group_wake
函数 - 所以通知并不需要一直等待,因为
dispatch_group_notify
和dispatch_group_leave
中,都有_dispatch_group_wake
函数的调用
11.5 任务
进入dispatch_group_async
函数
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
进入_dispatch_continuation_group_async
函数
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
- 调用
dispatch_group_enter
函数,进行进组操作
源码中搜索_dispatch_client_callout
函数,在_dispatch_continuation_with_group_invoke
函数中,同样调用dispatch_group_leave
函数,进行出组操作
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
12. dispatch_source
dispatch_source
是基础数据类型,用于协调特定底层系统事件的处理
12.1 基本介绍
dispatch_source
替代了异步回调函数,来处理系统相关的事件。当配置一个dispatch
时,你需要指定监测的事件、队列、以及任务回调。当事件发生时,dispatch source
会提交block
或函数到指定的queue
去执行
使用dispatch_source
代替dispatch_async
的原因在于联结的优势
联结:在任一线程上调用它的一个函数dispatch_source_merge_data
后,会执行Dispatch Source
事先定义好的句柄(可以把句柄简单理解为一个block
),这个过程叫Custom event
,用户事件。是dispatch source
支持处理的一种事件
句柄:是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有密切的关系,包含:
- 实例句柄
HINSTANCE
- 位图句柄
HBITMAP
- 设备表句柄
HDC
- 图标句柄
HICON
- 通用句柄
HANDLE
简单来说:这种事件是由你调用dispatch_source_merge_data
函数来向自己发出的信号
使用dispatch_source
的优点:
- 其
CPU
负荷非常小,尽量不占用资源 - 联结的优势
12.2 创建dispatch source
使用dispatch_source_create
函数
dispatch_source_t source = dispatch_source_create(dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue)
type
:dispatch
源可处理的事件handle
:可以理解为句柄、索引或ID
,假如要监听进程,需要传入进程ID
mask
:可以理解为描述,提供更详细的描述,让它知道具体要监听什么queue
:自定义源需要的一个队列,用来处理所有的响应句柄
dispatch source
中type
的类型:
DISPATCH_SOURCE_TYPE_DATA_ADD
:自定义的事件,变量增加DISPATCH_SOURCE_TYPE_DATA_OR
:自定义的事件,变量OR
DISPATCH_SOURCE_TYPE_MACH_SENDMACH
:端口发送DISPATCH_SOURCE_TYPE_MACH_RECVMACH
:端口接收DISPATCH_SOURCE_TYPE_MEMORYPRESSURE
:内存压力 (注:iOS8
后可用)DISPATCH_SOURCE_TYPE_PROC
:进程监听,如进程的退出、创建一个或更多的子线程、进程收到UNIX信号DISPATCH_SOURCE_TYPE_READ
:IO
操作,如对文件的操作、socket
操作的读响应DISPATCH_SOURCE_TYPE_SIGNAL
:接收到UNIX
信号时响应DISPATCH_SOURCE_TYPE_TIMER
:定时器DISPATCH_SOURCE_TYPE_VNODE
:文件状态监听,文件被删除、移动、重命名DISPATCH_SOURCE_TYPE_WRITE
:IO
操作,如对文件的操作、socket
操作的写响应
12.3 常用API
//挂起队列
dispatch_suspend(queue)
//分派源创建时默认处于暂停状态,在分派源分派处理程序之前必须先恢复
dispatch_resume(source)
//向分派源发送事件,需要注意的是,不可以传递0值(事件不会被触发),同样也不可以传递负数。
dispatch_source_merge_data
//设置响应分派源事件的block,在分派源指定的队列上运行
dispatch_source_set_event_handler
//得到分派源的数据
dispatch_source_get_data
//得到dispatch源创建,即调用dispatch_source_create的第二个参数
uintptr_t dispatch_source_get_handle(dispatch_source_t source);
//得到dispatch源创建,即调用dispatch_source_create的第三个参数
unsigned long dispatch_source_get_mask(dispatch_source_t source);
////取消dispatch源的事件处理--即不再调用block。如果调用dispatch_suspend只是暂停dispatch源。
void dispatch_source_cancel(dispatch_source_t source);
//检测是否dispatch源被取消,如果返回非0值则表明dispatch源已经被取消
long dispatch_source_testcancel(dispatch_source_t source);
//dispatch源取消时调用的block,一般用于关闭文件或socket等,释放相关资源
void dispatch_source_set_cancel_handler(dispatch_source_t source, dispatch_block_t cancel_handler);
//可用于设置dispatch源启动时调用block,调用完成后即释放这个block。也可在dispatch源运行当中随时调用这个函数。
void dispatch_source_set_registration_handler(dispatch_source_t source, dispatch_block_t registration_handler);
12.4 timer
的封装
打开SparkTimer.h
文件,写入以下代码:
#import <Foundation/Foundation.h>
@class SparkTimer;
typedef void (^TimerBlock)(SparkTimer * _Nonnull timer);
@interface SparkTimer : NSObject
+ (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti target:(id)aTarget selector:(SEL)aSelector userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately;
+ (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately timerBlock:(TimerBlock)block;
- (void)invalidate;
@property (nonatomic, readonly, getter=isValid) BOOL valid;
@property (nonatomic, nullable, readonly, retain) id userInfo;
@end
打开SparkTimer.m
文件,写入以下代码:
#import "SparkTimer.h"
@interface SparkTimer ()
@property (nonatomic, assign) NSTimeInterval interval;
@property (nonatomic, nullable, readwrite, retain) id userInfo;
@property (nonatomic, assign) BOOL repeats;
@property (nonatomic, assign) BOOL immediately;
@property (nonatomic, strong) dispatch_source_t timer;
@property (nonatomic, readwrite, getter=isValid) BOOL valid;
@end
@implementation SparkTimer
+ (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti target:(id)aTarget selector:(SEL)aSelector userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately{
return [[SparkTimer alloc] initWithInterval:ti target:aTarget selector:aSelector userInfo:userInfo repeats:yesOrNo immediately:isImmediately timerBlock:nil isBlock:NO];
}
+ (SparkTimer *)scheduledTimerWithTimeInterval:(NSTimeInterval)ti userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately timerBlock:(TimerBlock)block{
return [[SparkTimer alloc] initWithInterval:ti target:nil selector:nil userInfo:userInfo repeats:yesOrNo immediately:isImmediately timerBlock:block isBlock:YES];
}
- (instancetype)initWithInterval:(NSTimeInterval)ti target:(id)aTarget selector:(SEL)aSelector userInfo:(nullable id)userInfo repeats:(BOOL)yesOrNo immediately:(BOOL)isImmediately timerBlock:(TimerBlock)block isBlock:(BOOL)isBlock
{
self = [super init];
if (self) {
_interval = ti;
_userInfo = userInfo;
_repeats = yesOrNo;
_immediately = isImmediately;
_valid = NO;
@weakify(self)
[self createTimer:^{
@strongify(self)
if(!isBlock){
[self callOutWithTarget:aTarget selector:aSelector];
return;
}
[self callOutWithTimerBlock:block];
}];
}
return self;
}
- (void)callOutWithTarget:(id)aTarget selector:(SEL)aSelector{
if(!aTarget || !aSelector){
[self invalidate];
return;
}
[aTarget performSelector:aSelector withObject:self];
[self checkRepeats];
}
- (void)callOutWithTimerBlock:(TimerBlock)block{
if(!block){
[self invalidate];
return;
}
block(self);
[self checkRepeats];
}
- (void)checkRepeats{
if(self.repeats){
return;
}
[self invalidate];
}
- (void)createTimer:(void(^)(void))block{
_valid = YES;
//1.创建队列
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
//2.创建timer
_timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
dispatch_time_t start;
if(self.immediately){
start = DISPATCH_TIME_NOW;
}
else{
start = dispatch_time(DISPATCH_TIME_NOW, self.interval * NSEC_PER_SEC);
}
//3.设置timer首次执行时间,间隔,精确度
dispatch_source_set_timer(_timer, start, self.interval * NSEC_PER_SEC, 0.1 * NSEC_PER_SEC);
//4.设置timer事件回调
dispatch_source_set_event_handler(_timer, ^{
NSLog(@"计时");
block();
});
//5.默认是挂起状态,需要手动激活
dispatch_resume(_timer);
}
- (void)invalidate{
if(!self.isValid){
return;
}
_valid = NO;
dispatch_source_cancel(_timer);
}
- (id)userInfo{
return _userInfo;
}
@end
总结
创建队列:
- 串行队列的
dq_atomic_flags
为1
dq_serialnum
:标记队列类型- 队列也是对象,也有
isa
指向 - 队列通过模板创建
同步函数:
- 当同步函数加入到串行队列中,同一线程在等待时又被执行,就会形成相互等待的局面,造成死锁的异常
异步函数:
- 通过
dx_push
递归,会重定向到根队列,然后通过pthread_creat
创建线程,最后通过dx_invoke
执行block
回调
单例模式:
- 底层包含锁的处理,线程安全
线程池:
- 使用
pthread_create
函数,创建线程 - 最大线程数是未知的
- 如果按照内核态
1GB
满载,最小堆栈大小为16KB
计算,最大线程数可开辟64 * 1024
。按照辅助线程512KB
计算,最大线程数可开辟2048
栅栏函数:
- 异步栅栏函数阻塞的是队列,而且必须是自定义的并发队列,不影响主线程任务的执行
- 同步栅栏函数阻塞的是线程,且是主线程,会影响主线程其他任务的执行
信号量:
dispatch_semaphore_create
:初始化信号量,设置GCD
最大并发数,必须>= 0
dispatch_semaphore_wait
:等待,对信号量减1
,相当于加锁- 若信号量
>= 0
,直接返回0
,执行wait
之后的代码 - 若信号量
< 0
,将阻塞当前线程,进入_dispatch_semaphore_wait_slow
函数
- 若信号量
dispatch_semaphore_signal
:释放,对信号量加1
,相当于解锁
调度组:
dispatch_group_enter
和dispatch_group_leave
必须成对使用dispatch_group_enter
:进行减1
操作,执行后为-1
dispatch_group_leave
:进行加1
操作,执行后为0
dispatch_group_notify
:判断状态为0
,调用_dispatch_group_wake
函数- 通知函数不需要一直等待,因为在
dispatch_group_leave
中也有_dispatch_group_wake
函数的调用,同样可以唤醒block
任务
- 通知函数不需要一直等待,因为在
dispatch_group_async
:内部封装了dispatch_group_enter
和dispatch_group_leave