多线程

访问此 博客文章 以了解 Julia 多线程特性。

启用Julia多线程

Julia 默认启动一个线程执行代码,这点可以通过 Threads.nthreads() 来确认:

  1. julia> Threads.nthreads()
  2. 1

执行线程的数量通过使用-t/--threads 命令行参数或使用JULIA_NUM_THREADS 环境变量。 当两者都被指定时,-t/--threads 优先级更高。

!!! compat “Julia 1.5” -t/--threads 命令行参数至少需要 Julia 1.5。在旧版本中,你必须改用环境变量。

让我们以4个线程启动Julia

  1. $ julia --threads 4

现在确认下确实有4个线程:

  1. julia> Threads.nthreads()
  2. 4

不过我们现在是在 master 线程,用 Threads.threadid 确认下:

  1. julia> Threads.threadid()
  2. 1

!!! note 如果你更喜欢使用环境变量,可以按如下方式设置它 Bash (Linux/macOS):

  1. ```bash
  2. export JULIA_NUM_THREADS=4
  3. ```
  4. C shell on Linux/macOS, CMD on Windows:
  5. ```bash
  6. set JULIA_NUM_THREADS=4
  7. ```
  8. Powershell on Windows:
  9. ```powershell
  10. $env:JULIA_NUM_THREADS=4
  11. ```
  12. Note that this must be done *before* starting Julia.

!!! note 使用 -t/--threads 指定的线程数传播到使用 -p/--procs--machine-file 命令行选项产生的工作进程。 例如,julia -p2 -t2 产生 1 个主进程和 2 个工作进程,并且所有三个进程都启用了 2 个线程。 要对工作线程进行更细粒度的控制,请使用 addprocs 并将 -t/--threads 作为 exeflags 传递。

数据竞争自由

你有责任确保程序没有数据竞争,如果你不遵守该要求,则不能假设这里承诺的任何内容。 观察到的结果可能是反直觉的。

为了确保这一点,最好的办法是获取多线程同时访问的数据的锁。 例如,在大多数情况下,你应该使用以下代码模板:

  1. julia> lock(lk) do
  2. use(a)
  3. end
  4. julia> begin
  5. lock(lk)
  6. try
  7. use(a)
  8. finally
  9. unlock(lk)
  10. end
  11. end

其中 lk 是一个锁(例如 ReentrantLock()), a 是数据。

此外,Julia 在出现数据竞争时不是内存安全的。如果另一个线程可能会写入数据,则在读取任何数据时都要非常小心! 相反,在更改其他线程访问的数据(例如分配给全局或闭包变量)时,请始终使用上述锁模式。

  1. Thread 1:
  2. global b = false
  3. global a = rand()
  4. global b = true
  5. Thread 2:
  6. while !b; end
  7. bad_read1(a) # it is NOT safe to access `a` here!
  8. Thread 3:
  9. while !@isdefined(a); end
  10. bad_read2(a) # it is NOT safe to access `a` here

@threads

下面用一个简单的例子测试我们原生的线程,首先创建一个全零的数组:

  1. julia> a = zeros(10)
  2. 10-element Vector{Float64}:
  3. 0.0
  4. 0.0
  5. 0.0
  6. 0.0
  7. 0.0
  8. 0.0
  9. 0.0
  10. 0.0
  11. 0.0
  12. 0.0

现在用4个线程模拟操作这个数组,每个线程往对应的位置写入线程ID。

Julia 用 Threads.@threads 宏实现并行循环,该宏加在 for 循环前面,提示 Julia 循环部分是一个多线程的区域:

  1. julia> Threads.@threads for i = 1:10
  2. a[i] = Threads.threadid()
  3. end

根据线程调度,迭代在各线程中进行拆分,之后各线程将自己的线程ID写入对应区域。

  1. julia> a
  2. 10-element Vector{Float64}:
  3. 1.0
  4. 1.0
  5. 1.0
  6. 2.0
  7. 2.0
  8. 2.0
  9. 3.0
  10. 3.0
  11. 4.0
  12. 4.0

注意 Threads.@threads 并没有一个像 @distributed 一样的可选的 reduction 参数。

原子操作

Julia 支持访问和修改值的原子操作,即以一种线程安全的方式来避免竞态条件。一个值(必须是基本类型的,primitive type)可以通过 Threads.Atomic 来包装起来从而支持原子操作。下面看个例子:

  1. julia> i = Threads.Atomic{Int}(0);
  2. julia> ids = zeros(4);
  3. julia> old_is = zeros(4);
  4. julia> Threads.@threads for id in 1:4
  5. old_is[id] = Threads.atomic_add!(i, id)
  6. ids[id] = id
  7. end
  8. julia> old_is
  9. 4-element Vector{Float64}:
  10. 0.0
  11. 1.0
  12. 7.0
  13. 3.0
  14. julia> i[]
  15. 10
  16. julia> ids
  17. 4-element Vector{Float64}:
  18. 1.0
  19. 2.0
  20. 3.0
  21. 4.0

如果不加 Atomic 的话,那么会因为竞态条件而得到错误的结果,下面是一个没有避免竞态条件的例子:

  1. julia> using Base.Threads
  2. julia> nthreads()
  3. 4
  4. julia> acc = Ref(0)
  5. Base.RefValue{Int64}(0)
  6. julia> @threads for i in 1:1000
  7. acc[] += 1
  8. end
  9. julia> acc[]
  10. 926
  11. julia> acc = Atomic{Int64}(0)
  12. Atomic{Int64}(0)
  13. julia> @threads for i in 1:1000
  14. atomic_add!(acc, 1)
  15. end
  16. julia> acc[]
  17. 1000

field粒度的原子操作

我们还可以使用@atomic@atomicswap@atomicreplace 宏在更细粒度的级别上使用原子。

内存模型的具体细节和设计的其他细节写在Julia Atomics Manifesto中,稍后将正式发布。

struct 声明中的任何字段都可以用 @atomic 修饰,然后任何写入也必须用 @atomic 标记,并且必须使用定义的原子顺序之一(:monotonic、:acquire、:release、:acquire _release 或 :sequentially_consistent)。 对原子字段的任何读取也可以使用原子排序约束进行注释,或者如果未指定,将使用单调(宽松)排序完成。

!!! compat “Julia 1.7” field粒度的原子操作至少需要 Julia 1.7.

副作用和可变的函数参数

使用多线程时,我们必须小心使用非 的函数,因为我们可能会得到错误的答案。 例如,按照惯例具有 名称以! 结尾 的函数会修改它们的参数,因此不是纯函数。

@threadcall

外部库,例如通过 ccall 调用的库,给 Julia 基于任务的 I/O 机制带来了问题。 如果 C 库执行阻塞操作,这会阻止 Julia 调度程序执行任何其他任务,直到调用返回。(例外情况是调用回调到 Julia 的自定义 C 代码,然后它可能会 yield,或者调用 jl_yield() 的 C 代码,jl_yieldyield 的 C 等价物。)

@threadcall 宏提供了一种避免在这种情况下停止执行的方法。它调度一个 C 函数以在单独的线程中执行。为此使用默认大小为 4 的线程池。线程池的大小由环境变量UV_THREADPOOL_SIZE控制。 在等待空闲线程时,以及一旦线程可用后的函数执行期间,请求任务(在主 Julia 事件循环上)让步给其他任务。 注意,@threadcall 在执行完成之前不会返回。 因此,从用户的角度来看,它与其他 Julia API 一样是一个阻塞调用。

非常关键的一点是,被调用的函数不会再调用回 Julia。

@threadcall 在 Julia 未来的版本中可能会被移除或改变。

注意!

此时,如果用户代码没有数据竞争,Julia 运行时和标准库中的大多数操作都可以以线程安全的方式使用。 然而,在某些领域,稳定线程支持的工作正在进行中。多线程编程有许多内在的困难,如果使用线程的程序表现出异常或与预期不符的行为(例如崩溃或神秘的结果),通常应该首先怀疑线程交互。

在 Julia 中使用线程时需要注意以下这些特定的限制和警告:

  • 如果多个线程同时使用基本容器类型,且至少有一个线程修改容器时,需要手动加锁(常见示例包括 push! 数组,或将项插入 Dict)。
  • 任务开始在某个线程上运行后(例如通过@spawn),它会在阻塞后始终在同一线程上重新启动。 将来这个限制将被移除,任务会在线程之间迁移。
  • @threads 当前使用静态调度,使用所有线程并为每个线程分配相等的迭代计数。将来,默认时间表可能会更改为动态的。
  • @spawn 使用的时间表是不确定的,不应依赖。
  • 计算绑定、非内存分配任务可以防止垃圾回收在其他正在分配内存的线程中运行。 在这些情况下,可能需要手动调用 GC.safepoint() 以允许 GC 运行。
  1. 该限制在未来会被移除。
  • 避免并行运行顶层操作,例如,includeeval 评估类型、方法和模块定义。

  • 请注意,如果启用线程,则库注册的终结器可能会中断。 这可能需要在整个生态系统中进行一些过渡工作,然后才能放心地广泛采用线程。 有关更多详细信息,请参阅下一节。

终结器的安全使用

因为终结器可以中断任何代码,所以它们在如何与任何全局状态交互时必须非常小心。 不幸的是,使用终结器的主要原因是更新全局状态(纯函数作为终结器通常毫无意义)。 这让我们陷入了一个难题。 有几种方法可以处理这个问题:

  1. 当单线程时,代码可以调用内部 jl_gc_enable_finalizers C 函数以防止在关键区域内调度终结器。 在内部,这在某些函数(例如我们的 C locks)中使用,以防止在执行某些操作(增量包加载、代码生成等)时发生递归。 锁和此标志的组合可用于使终结器安全。
  1. Base 在几个地方采用的第二种策略是显式延迟终结器,直到它可以非递归地获取其锁。 以下示例演示了如何将此策略应用于 Distributed.finalize_ref
  1. function finalize_ref(r::AbstractRemoteRef)
  2. if r.where > 0 # Check if the finalizer is already run
  3. if islocked(client_refs) || !trylock(client_refs)
  4. # delay finalizer for later if we aren't free to acquire the lock
  5. finalizer(finalize_ref, r)
  6. return nothing
  7. end
  8. try # `lock` should always be followed by `try`
  9. if r.where > 0 # Must check again here
  10. # Do actual cleanup here
  11. r.where = 0
  12. end
  13. finally
  14. unlock(client_refs)
  15. end
  16. end
  17. nothing
  18. end
  1. 相关的第三种策略是使用不需要 yield 的队列。 我们目前没有在 Base 中实现无锁队列,但 Base.InvasiveLinkedListSynchronized{T} 是合适的。 这通常是用于带有事件循环的代码的好策略。 例如,这个策略被 Gtk.jl 用来管理生命周期引用计数。 在这种方法中,我们不会在终结器内部做任何显式工作,而是将其添加到队列中以在更安全的时间运行。 事实上,Julia 的任务调度器已经使用了这种方法,因此将终结器定义为 x -> @spawn do_cleanup(x) 就是这种方法的一个示例。 但是请注意,这并不控制 do_cleanup 在哪个线程上运行,因此 do_cleanup 仍需要获取锁。 如果你实现自己的队列,则不必如此,因为你只能明确地从线程中排出该队列。