概述

首先,关于线程的理解,如果有操作系统的相关知识,那么就很好理解。在操作系统中,线程是进程的一个子集。进程的定义:一个程序的一次运行。而线程则是,多个共享数据的进程。在本文中,线程表示的是在单个 Lisp 进程运行的多个单独的程序,其共享同一部分内存地址。一般来说,系统(不论是 lisp 内核 还是操作系统)会在这写线程间自动切换,因此这些线程看起来是并行运行的(异步)。本章要讲解的是线程的创建和管理,以及一些线程间的交互方式。想要了解更多关于 lisp 和其他进程之间的交互的话,参考阅读第 18 章:系统交互

对一些粗心的人要提醒下,大部分解释其会将线程称为进程——这是历史遗留问题,因为 Lisp 的出现要比术语线程这个词要早。这可以算作是成熟的解释器的标志之一。

ANSI Common Lisp 标准中没有涉及到这个话题。接下来会来讲解关于 bordeaux-threadsSBCL threadslparallel 库。

比较常用的标准库是 —— 波多尔线程(bodeaux-threads),该库是由底层原语实现的。lparallel 就是在波多尔线程上构建的,同时实现了以下的特性:

  • 简单的接收队列自任务模型
  • 详细的并行结构
  • 异步处理线程边界异常
  • 并行版的 map、reduce、sort、remove 等
  • promises, futures, and delayed evaluation constructs
  • 用于并行化相互关联的任务的计算树
  • 绑定与解绑 FIFO 队列
  • 通道
  • 任务优先级
  • 按类别结束任务
  • 集成超时

更多关于并行和并发的库,参见 awesome CL list
Quickdocs.

缘由

首先要问的是:为什么需要线程/进程呢?你可能会简单的回答说“我的引用很简单,根本不需要关注这些。”同在大部分情况下,很难想象没有多线程的复杂程序要怎么设计。例如:

  • 需要开发个服务,这个服务(例如 websockets 章节中的 web 服务)要同时响应多个用户/连接
  • 需要执行后台活动,同时不停止主程序。
  • 需要在一段时间后通知应用
  • 需要在等待系统资源时保持程序的运行活动
  • 需要与多线程的系统(比如说,Windows 下的窗口通常都是在自己的线程中运行)进行通信
  • 需要在应用的不同部分关联不同的上下文(如不同的动态绑定)
  • 可能就是简单的像同时做两件事

并发与并行

鸣谢:以下部分首次是有 Timmy Jose 在 z0ltan.wordpress.com 上发布的。

instance) and context-switching them.
并发是同时运行不同(也许相关)的任务。也就是说,即便是在单核 CPU 的电脑上,也可以用线程的上下文切换来模拟同时性。

在系统(原生操作系统)中,线程的调度和上下文切换最终是由操作系统终止的。Java 和 Common Lisp 中也一样。

线程的调度完全是由程序来控制管理的线程叫“绿色(green)“线程。Erlang 就是个很好的例子。

那么并发和并行有什么差别呢?严格意义上来说,并行是同时在不同处理器或不同的核上平行地运行单个任务。狭义上来说,在单核单处理器的电脑上无法做到并行。

在更抽象的层面能更好的区分这两个概念:并发主要给客户端一种同时性的假象,因此在长操作中也不会出现系统锁定的现象。GUI 系统就是个很好的例子。因此并发关注的是用户体验,不关心性能。

Java Swing 工具套件和 JavaScript 都是单线程的,但它们都有同时性的表现,因为会在后台进行上下文切换。当然咯,大部分情况下并发也是由多线程/处理器实现的。

另一方面,并行,关注点是性能。例如,当要计算出一定范围内偶数的平方时,就可以一个大范围分成多个小块,然后再在不同的核或处理器上并行运行,再将结果给组合起来就是最终结果了。这就是 Map-Reduce 的操作。

现在已经在抽象上区分了并发和并行,之后就是实际上实现的机制。很多人在这对容易搞混。他们倾向于用具体的实现方法来约束抽象概念。从本质上讲,这两个抽象概念可以相同的机制实现!例如,可以使用Java 中相同的基本线程机制来实现并发特性和并行特性。只有抽象层面上,任务概念的相互作用或独立性才有影响。

例如,有个要使用不同的线程(可能是在不同的核/处理器上)才能完成部分工作的任务,但生成这个线程的线程逻辑上依赖衍生线程(不必阻塞该进程)的结果时,这还是并发。

所以最终的结论是:并发和并行是不同的概念,但可以使用相同的机制(如线程、进程等)实现。

波多尔线程

bordeaux 库可以不依赖平台进行基础的线程操作,同时也可以在不同的解释器上运行。有趣的是 bordeaux 是创建原生的线程,而是完全依赖解释器下层来创建线程。

另一方面,bordeaux 在底层线程的抽象中提供了一些很实用特性。

同时,在 demo 程序中,bordeaux 中的函数和 SBCL 中的函数很像。不知道这是不是个巧合。

在下文中会有详细的介绍(具体参见“封装”一节)

安装波多尔线程

通过 quicklisp 安装加载波多尔线程:

  1. CL-USER> (ql:quickload :bt-semaphore)
  2. To load "bt-semaphore":
  3. Load 1 ASDF system:
  4. bt-semaphore
  5. ; Loading "bt-semaphore"
  6. (:BT-SEMAPHORE)

查看是否支持线程

不管使用的是哪个 Common Lisp 解释器,都可以使用标准的方法来检查是否支持线程:

  1. CL-USER> (member :thread-support *FEATURES*)
  2. (:THREAD-SUPPORT :SWANK :QUICKLISP :ASDF-PACKAGE-SYSTEM :ASDF3.1 :ASDF3 :ASDF2
  3. :ASDF :OS-MACOSX :OS-UNIX :NON-BASE-CHARS-EXIST-P :ASDF-UNICODE :64-BIT
  4. :64-BIT-REGISTERS :ALIEN-CALLBACKS :ANSI-CL :ASH-RIGHT-VOPS :BSD
  5. :C-STACK-IS-CONTROL-STACK :COMMON-LISP :COMPARE-AND-SWAP-VOPS
  6. :COMPLEX-FLOAT-VOPS :CYCLE-COUNTER :DARWIN :DARWIN9-OR-BETTER :FLOAT-EQL-VOPS
  7. :FP-AND-PC-STANDARD-SAVE :GENCGC :IEEE-FLOATING-POINT :INLINE-CONSTANTS
  8. :INODE64 :INTEGER-EQL-VOP :LINKAGE-TABLE :LITTLE-ENDIAN
  9. :MACH-EXCEPTION-HANDLER :MACH-O :MEMORY-BARRIER-VOPS :MULTIPLY-HIGH-VOPS
  10. :OS-PROVIDES-BLKSIZE-T :OS-PROVIDES-DLADDR :OS-PROVIDES-DLOPEN
  11. :OS-PROVIDES-PUTWC :OS-PROVIDES-SUSECONDS-T :PACKAGE-LOCAL-NICKNAMES
  12. :PRECISE-ARG-COUNT-ERROR :RAW-INSTANCE-INIT-VOPS :SB-DOC :SB-EVAL :SB-LDB
  13. :SB-PACKAGE-LOCKS :SB-SIMD-PACK :SB-SOURCE-LOCATIONS :SB-TEST :SB-THREAD
  14. :SB-UNICODE :SBCL :STACK-ALLOCATABLE-CLOSURES :STACK-ALLOCATABLE-FIXED-OBJECTS
  15. :STACK-ALLOCATABLE-LISTS :STACK-ALLOCATABLE-VECTORS
  16. :STACK-GROWS-DOWNWARD-NOT-UPWARD :SYMBOL-INFO-VOPS :UD2-BREAKPOINTS :UNIX
  17. :UNWIND-TO-FRAME-AND-CALL-VOP :X86-64)

如果解释器不支持线程的话,上面的代码会返回 “NIL”。

每个库都有自己的检查是否支持并发的方法,这就可可以不用上面的通用检查方法来检查了。

例如,在下面的例子中,使用的是 bordeaux 库。可以查看 supports-threads-p 这个全局变量来检查是否支持多线程,如果 support-threads-p 是 NIL,则不支持,T 的话就是支持:

  1. CL-USER> bt:*supports-threads-p*
  2. T

好了,现在已经知道这个方法了,那么现在来试试这个平台无关库(bordeaxu)和特定的平台(SBCL)吧。

为了这点,先看一些简单的例子吧:

  • 基础 —— 列出当前线程、所有线程以及获取线程名
  • 更新全局变量
  • 在顶层打印消息
  • 打印消息 —— 修复版
  • 打印消息 —— 进阶版
  • 修改多线程共享资源
  • 修改共享资源 —— 有锁版
  • 修改共享资源 —— 原子操作版
  • 阻塞,销毁进程

基础 —— 列出当前线程、所有线程以及获取线程名

  1. ;;; Print the current thread, all the threads, and the current thread's name
  2. (defun print-thread-info ()
  3. (let* ((curr-thread (bt:current-thread))
  4. (curr-thread-name (bt:thread-name curr-thread))
  5. (all-threads (bt:all-threads)))
  6. (format t "Current thread: ~a~%~%" curr-thread)
  7. (format t "Current thread name: ~a~%~%" curr-thread-name)
  8. (format t "All threads:~% ~{~a~%~}~%" all-threads))
  9. nil)

输入如下:

  1. CL-USER> (print-thread-info)
  2. Current thread: #<THREAD "repl-thread" RUNNING {10043B8003}>
  3. Current thread name: repl-thread
  4. All threads:
  5. #<THREAD "repl-thread" RUNNING {10043B8003}>
  6. #<THREAD "auto-flush-thread" RUNNING {10043B7DA3}>
  7. #<THREAD "swank-indentation-cache-thread" waiting on: #<WAITQUEUE {1003A28103}> {1003A201A3}>
  8. #<THREAD "reader-thread" RUNNING {1003A20063}>
  9. #<THREAD "control-thread" waiting on: #<WAITQUEUE {1003A19E53}> {1003A18C83}>
  10. #<THREAD "Swank Sentinel" waiting on: #<WAITQUEUE {1003790043}> {1003788023}>
  11. #<THREAD "main thread" RUNNING {1002991CE3}>
  12. NIL

从线程中更新全局变量:

  1. (defparameter *counter* 0)
  2. (defun test-update-global-variable ()
  3. (bt:make-thread
  4. (lambda ()
  5. (sleep 1)
  6. (incf *counter*)))
  7. *counter*)

可以使用 bt:make-thread 函数创建新新线程,该函数的参数是个抽象的 lambda 表达式。注意,该 lambda 表达式不能有参数。

还有一点需要注意的是,与其他语言不同的是,线程对象的创建和执行之间没有分隔。也就是说,线程创建好后就会直接执行。

输出如下:

  1. CL-USER> (test-update-global-variable)
  2. 0
  3. CL-USER> *counter*
  4. 1

如上所示,主线程立即返回了 *counter* 的初始值 0,大概在一秒钟后,匿名线程就会将该值跟新到 1。

线程创建:打印消息

  1. ;;; Print a message onto the top-level using a thread
  2. (defun print-message-top-level-wrong ()
  3. (bt:make-thread
  4. (lambda ()
  5. (format *standard-output* "Hello from thread!"))
  6. :name "hello")
  7. nil)

输出为:

  1. CL-USER> (print-message-top-level-wrong)
  2. NIL

哪里错了呢?问题出在变量绑定上。在这里,将 format 函数的参数设为 t 的话,输出就会指向的是顶层(Common Lisp 终端控制流中的常见术语),也指向了全局变量 *standard-output*。因此就可以在控制台中看到输出。

如果在同一个线程中运行上面的代码的话,是完全没有问题的。但当在不同的线程中,每个线程都会有单独的栈来保存变量。因此,即便是全局变量 *standard-output*(假设所有的线程中都有)也会在每个线程中重新绑定!这和 Java 中的 ThreadLocal 存储概念很相似。现在需要对上面的代码进行修复。

但是要怎么来修复这个问题呢?当然是在线程创建时将输出绑定到顶层咯。纯词法作用域是可以挽救以下这种情况的!

  1. ;;; Print a message onto the top-level using a thread fixed
  2. (defun print-message-top-level-fixed ()
  3. (let ((top-level *standard-output*))
  4. (bt:make-thread
  5. (lambda ()
  6. (format top-level "Hello from thread!"))
  7. :name "hello")))
  8. nil)

输出结果是:

  1. CL-USER> (print-message-top-level-fixed)
  2. Hello from thread!
  3. NIL

哎唷,可以了。但是,下面还有个好玩的宏也能返回同样的结果。

打印消息 —— read-time eval macro

先看代码:

  1. ;;; Print a message onto the top-level using a thread - reader macro
  2. (eval-when (:compile-toplevel)
  3. (defun print-message-top-level-reader-macro ()
  4. (bt:make-thread
  5. (lambda ()
  6. (format #.*standard-output* "Hello from thread!")))
  7. nil))
  8. (print-message-top-level-reader-macro)

输出结果是:

  1. CL-USER> (print-message-top-level-reader-macro)
  2. Hello from thread!
  3. NIL

结果和上面是一样的,但 *standard-output* 前面奇怪的 #. 是做什么的呢?

eval-when 控制什么时候对 Lisp 表达式求值。这里有三个 target::compile-toplevel、:load-toplevel 和 :execute

#. 会调用 “Reader 宏”。因为这个宏在 Common Lisp 中有特殊含义的,所以被叫做 reader(或 read)宏。其特殊性在于负责会读取 Common Lisp 表达式然后解析读入的表达式。这个特殊的 reader 宏确保在读取时绑定 *standard-output*

在读取时绑定值能够确保线程在运行时绑定的是原始值 *standard-output*,这样才能在顶层输出。

现在,是 eval-when 发挥作用的时候了。将整个函数定义封装到 eval-when 中,确保编译时间会进行求值替换,*standard-output* 也绑定到正确的值。但如果不用 eval-when 的话,就会看到下面的错误:

  1. error:
  2. don't know how to dump #<SWANK/GRAY::SLIME-OUTPUT-STREAM {100439EEA3}> (default MAKE-LOAD-FORM method called).
  3. ==>
  4. #<SWANK/GRAY::SLIME-OUTPUT-STREAM {100439EEA3}>
  5. note: The first argument never returns a value.
  6. note:
  7. deleting unreachable code
  8. ==>
  9. "Hello from thread!"
  10. Compilation failed.

这就解释的通了,由于输出流返回的结果是个流而不是个定义好的值(format 函数所期望的),所以 SBCL 无法解析这个结果。这就是“unreachable code” 错误的原因。

注意,如果直接在 REPL 中直接运行代码的话是完全没问题的,因为 REPL 线程会正确的处理所有的符号。

修改共享资源

假设有个简单的 bank-account 类(无错误检查):

  1. ;;; Modify a shared resource from multiple threads
  2. (defclass bank-account ()
  3. ((id :initarg :id
  4. :initform (error "id required")
  5. :accessor :id)
  6. (name :initarg :name
  7. :initform (error "name required")
  8. :accessor :name)
  9. (balance :initarg :balance
  10. :initform 0
  11. :accessor :balance)))
  12. (defgeneric deposit (account amount)
  13. (:documentation "Deposit money into the account"))
  14. (defgeneric withdraw (account amount)
  15. (:documentation "Withdraw amount from account"))
  16. (defmethod deposit ((account bank-account) (amount real))
  17. (incf (:balance account) amount))
  18. (defmethod withdraw ((account bank-account) (amount real))
  19. (decf (:balance account) amount))

同时有个客户端,该客户端不信任任何形式的同步:

  1. (defparameter *rich*
  2. (make-instance 'bank-account
  3. :id 1
  4. :name "Rich"
  5. :balance 0))
  6. ; compiling (DEFPARAMETER *RICH* ...)
  7. (defun demo-race-condition ()
  8. (loop repeat 100
  9. do
  10. (bt:make-thread
  11. (lambda ()
  12. (loop repeat 10000 do (deposit *rich* 100))
  13. (loop repeat 10000 do (withdraw *rich* 100))))))

来看下下面这些操作:创建个新的银行账户实例(账户余额(balance)为 0),然后创建 100 个线程,每个线程会执行 10000 次存入 100 额度的操作,之后在执行相同次数的提取操作。最终的结果应该是和开户时一样,都是 0,对吧?那么现在来检查下对不对。

将示例运行后,可能会得到如下的结果:

  1. CL-USER> (:balance *rich*)
  2. 0
  3. CL-USER> (dotimes (i 5)
  4. (demo-race-condition))
  5. NIL
  6. CL-USER> (:balance *rich*)
  7. 22844600

哇!造成这个差异的原因是 incf 和 decf 不是原子操作——它们是分为好几个步骤操作的,而且执行的顺序也不受控制。

这种现象叫做“竞争条件” —— 多个线程同时竞争相同的共享资源,一个值在修改的同时被读取,这得到的就是个错误的值。那要怎么修复这个问题呢?简单的办法就是使用锁(本例中使用的是互斥锁(mutex),在复杂的情况下使用信号量(semaphores))

修改共享资源:修复版(使用锁)

现将之前的账户余额该会到 0:

  1. CL-USER> (setf (:balance *rich*) 0)
  2. 0
  3. CL-USER> (:balance *rich*)
  4. 0

然后在 demo-race-condition 函数中对共享资源加上锁(bt:make-lock 创建锁):

  1. (defvar *lock* (bt:make-lock))
  2. ; compiling (DEFVAR *LOCK* …)
  3. (defun demo-race-condition-locks ()
  4. (loop repeat 100
  5. do
  6. (bt:make-thread
  7. (lambda ()
  8. (loop repeat 10000 do (bt:with-lock-held (*lock*)
  9. (deposit *rich* 100)))
  10. (loop repeat 10000 do (bt:with-lock-held (*lock*)
  11. (withdraw *rich* 100)))))))
  12. ; compiling (DEFUN DEMO-RACE-CONDITION-LOCKS ...)

然后执行和上面相同的操作:

  1. CL-USER> (dotimes (i 100)
  2. (demo-race-condition-locks))
  3. NIL
  4. CL-USER> (:balance *rich*)
  5. 0

很好!现在这个就要好多了。当然,记住互斥锁的使用影响性能。在这种情况下还有个更好的方法 —— 尽可能的使用原子操作。下面就来讲解吧。

修改共享资源:原子操作版

原子操作是操作系统为保证内部事物执行的操作,也就是说,主任务中的子任务不会受外部影响。操作要么成功,要么失败,没有中间层,也没有不一致的状态。

还有一点是,在保护共享资源的访问方面,原子操作的性能要远远优于比互斥锁。这个在 demo 中可以看到差别。

bordeaux 库并不支持原子操作,因此需要依赖于解释器来实现这个。在本章的例子中,使用的是 SBCL,在下面的 SBCL 章节中会有相应的 demo。

线程阻塞、销毁

阻塞线程使用 bt:join-thread 函数,销毁线程(不推荐)的话使用 bt:destory-thread 函数。

一个简单的 demo:

  1. (defmacro until (condition &body body)
  2. (let ((block-name (gensym)))
  3. `(block ,block-name
  4. (loop
  5. (if ,condition
  6. (return-from ,block-name nil)
  7. (progn
  8. ,@body))))))
  9. (defun join-destroy-thread ()
  10. (let* ((s *standard-output*)
  11. (joiner-thread (bt:make-thread
  12. (lambda ()
  13. (loop for i from 1 to 10
  14. do
  15. (format s "~%[Joiner Thread] Working...")
  16. (sleep (* 0.01 (random 100)))))))
  17. (destroyer-thread (bt:make-thread
  18. (lambda ()
  19. (loop for i from 1 to 1000000
  20. do
  21. (format s "~%[Destroyer Thread] Working...")
  22. (sleep (* 0.01 (random 10000))))))))
  23. (format t "~%[Main Thread] Waiting on joiner thread...")
  24. (bt:join-thread joiner-thread)
  25. (format t "~%[Main Thread] Done waiting on joiner thread")
  26. (if (bt:thread-alive-p destroyer-thread)
  27. (progn
  28. (format t "~%[Main Thread] Destroyer thread alive... killing it")
  29. (bt:destroy-thread destroyer-thread))
  30. (format t "~%[Main Thread] Destroyer thread is already dead"))
  31. (until (bt:thread-alive-p destroyer-thread)
  32. (format t "[Main Thread] Waiting for destroyer thread to die..."))
  33. (format t "~%[Main Thread] Destroyer thread dead")
  34. (format t "~%[Main Thread] Adios!~%")))

运行后的结果如下:

  1. CL-USER> (join-destroy-thread)
  2. [Joiner Thread] Working...
  3. [Destroyer Thread] Working...
  4. [Main Thread] Waiting on joiner thread...
  5. [Joiner Thread] Working...
  6. [Joiner Thread] Working...
  7. [Joiner Thread] Working...
  8. [Joiner Thread] Working...
  9. [Joiner Thread] Working...
  10. [Joiner Thread] Working...
  11. [Joiner Thread] Working...
  12. [Joiner Thread] Working...
  13. [Joiner Thread] Working...
  14. [Main Thread] Done waiting on joiner thread
  15. [Main Thread] Destroyer thread alive... killing it
  16. [Main Thread] Destroyer thread dead
  17. [Main Thread] Adios!
  18. NIL

只有当条件为真时,until 才会退出循环。剩下的代码就不用详细介绍了——主线程等待阻塞线程完成,然后立即销毁。

再次声明,不推荐使用 bt:destory-thread。任意要用 bt:destory-thread 函数的场景都可以找到一种更好的方法进行替代。

接下来,来看一些更完整的例子吧,这些例子会将目前所提到的概念全都联系在一起。

实用函数

下面是 demo 中用到的函数、宏和全局变量以及一些额外的内容的摘要,基本上涵盖了大多数编程场景:

  • bt:*supports-thread-p* (检查是否支持基础线程操作)
  • bt:make-thread (创建线程)
  • bt:current-thread (返回当前线程对象)
  • bt:all-threads (返回执行中的所有线程列表)
  • bt:thread-alive-p (检查线程是否存活)
  • bt:thread-name (返回线程名)
  • bt:join-thread (阻塞线程)
  • bt:interrupt-thread (中断线程)
  • bt:destroy-thread (退出/销毁线程)
  • bt:make-lock (创建互斥锁)
  • bt:with-lock-held (线程锁)

SBCL 线程

SBCL 中的 sb-thread 包中支持原生多线程。里面都是低级函数,但可以在顶层构建自己的抽象,就像 demo 中展示的一样。

更多的细节可以查看下文的“封装”部分。

通过下面的例子可以看出,bordeaux 和 SBCL 关于线程的函数有很强的对应关系。在大部分情况中,只是将包的名字由 bt 改成 sb-thread。

很明显,Bordeaux 线程库或多或少是基于 SBCL 实现的。因此,只有在语法或语义上有很大差别时才会进行讲解。

基础 —— 列出当前线程、所有线程以及获取线程名

代码如下:

  1. ;;; Print the current thread, all the threads, and the current thread's name
  2. (defun print-thread-info ()
  3. (let* ((curr-thread sb-thread:*current-thread*)
  4. (curr-thread-name (sb-thread:thread-name curr-thread))
  5. (all-threads (sb-thread:list-all-threads)))
  6. (format t "Current thread: ~a~%~%" curr-thread)
  7. (format t "Current thread name: ~a~%~%" curr-thread-name)
  8. (format t "All threads:~% ~{~a~%~}~%" all-threads))
  9. nil)

输出结果是:

  1. CL-USER> (print-thread-info)
  2. Current thread: #<THREAD "repl-thread" RUNNING {10043B8003}>
  3. Current thread name: repl-thread
  4. All threads:
  5. #<THREAD "repl-thread" RUNNING {10043B8003}>
  6. #<THREAD "auto-flush-thread" RUNNING {10043B7DA3}>
  7. #<THREAD "swank-indentation-cache-thread" waiting on: #<WAITQUEUE {1003A28103}> {1003A201A3}>
  8. #<THREAD "reader-thread" RUNNING {1003A20063}>
  9. #<THREAD "control-thread" waiting on: #<WAITQUEUE {1003A19E53}> {1003A18C83}>
  10. #<THREAD "Swank Sentinel" waiting on: #<WAITQUEUE {1003790043}> {1003788023}>
  11. #<THREAD "main thread" RUNNING {1002991CE3}>
  12. NIL

更新全局变量

代码如下:

  1. ;;; Update a global variable from a thread
  2. (defparameter *counter* 0)
  3. (defun test-update-global-variable ()
  4. (sb-thread:make-thread
  5. (lambda ()
  6. (sleep 1)
  7. (incf *counter*)))
  8. *counter*)

结果如下:

  1. CL-USER> (test-update-global-variable)
  2. 0

打印消息

代码如下:

  1. ;;; Print a message onto the top-level using a thread
  2. (defun print-message-top-level-wrong ()
  3. (sb-thread:make-thread
  4. (lambda ()
  5. (format *standard-output* "Hello from thread!")))
  6. nil)

输出如下:

  1. CL-USER> (print-message-top-level-wrong)
  2. NIL

修复版:

  1. ;;; Print a message onto the top-level using a thread - fixed
  2. (defun print-message-top-level-fixed ()
  3. (let ((top-level *standard-output*))
  4. (sb-thread:make-thread
  5. (lambda ()
  6. (format top-level "Hello from thread!"))))
  7. nil)

结果如下:

  1. CL-USER> (print-message-top-level-fixed)
  2. Hello from thread!
  3. NIL

打印消息:优化版

代码:

  1. ;;; Print a message onto the top-level using a thread - reader macro
  2. (eval-when (:compile-toplevel)
  3. (defun print-message-top-level-reader-macro ()
  4. (sb-thread:make-thread
  5. (lambda ()
  6. (format #.*standard-output* "Hello from thread!")))
  7. nil))

结果:

  1. CL-USER> (print-message-top-level-reader-macro)
  2. Hello from thread!
  3. NIL

修改共享资源

代码:

  1. ;;; Modify a shared resource from multiple threads
  2. (defclass bank-account ()
  3. ((id :initarg :id
  4. :initform (error "id required")
  5. :accessor :id)
  6. (name :initarg :name
  7. :initform (error "name required")
  8. :accessor :name)
  9. (balance :initarg :balance
  10. :initform 0
  11. :accessor :balance)))
  12. (defgeneric deposit (account amount)
  13. (:documentation "Deposit money into the account"))
  14. (defgeneric withdraw (account amount)
  15. (:documentation "Withdraw amount from account"))
  16. (defmethod deposit ((account bank-account) (amount real))
  17. (incf (:balance account) amount))
  18. (defmethod withdraw ((account bank-account) (amount real))
  19. (decf (:balance account) amount))
  20. (defparameter *rich*
  21. (make-instance 'bank-account
  22. :id 1
  23. :name "Rich"
  24. :balance 0))
  25. (defun demo-race-condition ()
  26. (loop repeat 100
  27. do
  28. (sb-thread:make-thread
  29. (lambda ()
  30. (loop repeat 10000 do (deposit *rich* 100))
  31. (loop repeat 10000 do (withdraw *rich* 100))))))

结果:

  1. CL-USER> (:balance *rich*)
  2. 0
  3. CL-USER> (demo-race-condition)
  4. NIL
  5. CL-USER> (:balance *rich*)
  6. 3987400

修改共享资源:修复版(互斥锁)

代码:

  1. (defvar *lock* (sb-thread:make-mutex))
  2. (defun demo-race-condition-locks ()
  3. (loop repeat 100
  4. do
  5. (sb-thread:make-thread
  6. (lambda ()
  7. (loop repeat 10000 do (sb-thread:with-mutex (*lock*)
  8. (deposit *rich* 100)))
  9. (loop repeat 10000 do (sb-thread:with-mutex (*lock*)
  10. (withdraw *rich* 100)))))))

唯一不同的是在 bordeaux 中用的是 make-lock,这里用的是 make-mutex,配合示例中的宏 with-mutex 使用。

输出结果是:

  1. CL-USER> (:balance *rich*)
  2. 0
  3. CL-USER> (demo-race-condition-locks)
  4. NIL
  5. CL-USER> (:balance *rich*)
  6. 0

修改共享资源:原子操作版

首先,代码如下:

  1. ;;; Modify a shared resource from multiple threads - atomics
  2. (defgeneric atomic-deposit (account amount)
  3. (:documentation "Atomic version of the deposit method"))
  4. (defgeneric atomic-withdraw (account amount)
  5. (:documentation "Atomic version of the withdraw method"))
  6. (defmethod atomic-deposit ((account bank-account) (amount real))
  7. (sb-ext:atomic-incf (car (cons (:balance account) nil)) amount))
  8. (defmethod atomic-withdraw ((account bank-account) (amount real))
  9. (sb-ext:atomic-decf (car (cons (:balance account) nil)) amount))
  10. (defun demo-race-condition-atomics ()
  11. (loop repeat 100
  12. do (sb-thread:make-thread
  13. (lambda ()
  14. (loop repeat 10000 do (atomic-deposit *rich* 100))
  15. (loop repeat 10000 do (atomic-withdraw *rich* 100))))))

结果是:

  1. CL-USER> (dotimes (i 5)
  2. (format t "~%Opening: ~d" (:balance *rich*))
  3. (demo-race-condition-atomics)
  4. (format t "~%Closing: ~d~%" (:balance *rich*)))
  5. Opening: 0
  6. Closing: 0
  7. Opening: 0
  8. Closing: 0
  9. Opening: 0
  10. Closing: 0
  11. Opening: 0
  12. Closing: 0
  13. Opening: 0
  14. Closing: 0
  15. NIL

如你所见,SBCL 的原子操作优点怪。这里使用到了两个函数:sb-ext:incfsb-ext:atomic-decf,它们的格式分别为:

  1. Macro: atomic-incf [sb-ext] place &optional diff

  1. Macro: atomic-decf [sb-ext] place &optional diff

比较有趣的地方是 “place” 参数需要是以下中一个(根据文件):

  • 具有声明类型(64位无符号字节类型)的 defstruct 属性或 simple-array(64为无符号字节类型)的 aref。 sb-ext:word 类型可用于这些目的。
  • cons 中的 car 或 cdr 类型(对应第一个和剩下的)
  • defglobal 定义的已声明的修正数(fixnum)变量

这就是为什么会在 atomic-depositatomic-decf 方法中使用 bizarre 结构。

尽可能的使用原子操作是考虑到性能方面。现在快速地将 demo-race-condition-locks 和 demo-race-condition-atomics 函数分别执行 1000 次,然后查看它们的性能上的差别(如果有的话):

使用互斥锁:

  1. CL-USER> (time
  2. (loop repeat 100
  3. do (demo-race-condition-locks)))
  4. Evaluation took:
  5. 57.711 seconds of real time
  6. 431.451639 seconds of total run time (408.014746 user, 23.436893 system)
  7. 747.61% CPU
  8. 126,674,011,941 processor cycles
  9. 3,329,504 bytes consed
  10. NIL

使用原子操作:

  1. CL-USER> (time
  2. (loop repeat 100
  3. do (demo-race-condition-atomics)))
  4. Evaluation took:
  5. 2.495 seconds of real time
  6. 8.175454 seconds of total run time (6.124259 user, 2.051195 system)
  7. [ Run times consist of 0.420 seconds GC time, and 7.756 seconds non-GC time. ]
  8. 327.66% CPU
  9. 5,477,039,706 processor cycles
  10. 3,201,582,368 bytes consed
  11. NIL

那么结果呢?互斥锁版本大概用了 57s,而原子操作只用的 2s! 这个简直是天差地别!

线程阻塞、销毁

代码如下:

  1. ;;; Joining on and destroying a thread
  2. (defmacro until (condition &body body)
  3. (let ((block-name (gensym)))
  4. `(block ,block-name
  5. (loop
  6. (if ,condition
  7. (return-from ,block-name nil)
  8. (progn
  9. ,@body))))))
  10. (defun join-destroy-thread ()
  11. (let* ((s *standard-output*)
  12. (joiner-thread (sb-thread:make-thread
  13. (lambda ()
  14. (loop for i from 1 to 10
  15. do
  16. (format s "~%[Joiner Thread] Working...")
  17. (sleep (* 0.01 (random 100)))))))
  18. (destroyer-thread (sb-thread:make-thread
  19. (lambda ()
  20. (loop for i from 1 to 1000000
  21. do
  22. (format s "~%[Destroyer Thread] Working...")
  23. (sleep (* 0.01 (random 10000))))))))
  24. (format t "~%[Main Thread] Waiting on joiner thread...")
  25. (bt:join-thread joiner-thread)
  26. (format t "~%[Main Thread] Done waiting on joiner thread")
  27. (if (sb-thread:thread-alive-p destroyer-thread)
  28. (progn
  29. (format t "~%[Main Thread] Destroyer thread alive... killing it")
  30. (sb-thread:terminate-thread destroyer-thread))
  31. (format t "~%[Main Thread] Destroyer thread is already dead"))
  32. (until (sb-thread:thread-alive-p destroyer-thread)
  33. (format t "[Main Thread] Waiting for destroyer thread to die..."))
  34. (format t "~%[Main Thread] Destroyer thread dead")
  35. (format t "~%[Main Thread] Adios!~%")))

结果为:

  1. CL-USER> (join-destroy-thread)
  2. [Joiner Thread] Working...
  3. [Destroyer Thread] Working...
  4. [Main Thread] Waiting on joiner thread...
  5. [Joiner Thread] Working...
  6. [Joiner Thread] Working...
  7. [Joiner Thread] Working...
  8. [Joiner Thread] Working...
  9. [Joiner Thread] Working...
  10. [Joiner Thread] Working...
  11. [Joiner Thread] Working...
  12. [Joiner Thread] Working...
  13. [Joiner Thread] Working...
  14. [Main Thread] Done waiting on joiner thread
  15. [Main Thread] Destroyer thread alive... killing it
  16. [Main Thread] Destroyer thread dead
  17. [Main Thread] Adios!
  18. NIL

实用函数

下面是示例中用到的函数、宏和全局变量以及一些额外的内容的摘要,基本上涵盖了大多数编程场景:

  • (member :thread-support *features*) (检查是否支持多线程)
  • sb-thread:make-thread (线程创建)
  • sb-thread:*current-thread* (获取当前线程对象)
  • sb-thread:list-all-threads (返回所有运行中的线程)
  • sb-thread:thread-alive-p (检查线程是否存活)
  • sb-thread:thread-name (返回线程名)
  • sb-thread:join-thread (线程阻塞)
  • sb-thread:interrupt-thread (线程中断)
  • sb-thread:destroy-thread (退出/销毁线程)
  • sb-thread:make-mutex (创建互斥锁)
  • sb-thread:with-mutex (线程锁)

封装

如你所见,Common Lisp 对并行的支持是很简单的,这主要是因为 ANSI Common Lisp 规范中缺少这个重要的特性。但这并没有减少 Common Lisp 实现,也没有减少像 bordeaux 这样的库。

可以自行选择深入阅读相关内容。以下是我自己的参考:

接下来,是关于这个迷你系列的最后一片文章:在 Common Lisp 中使用 lparallel 库。

并行编程:lparallel

值得注意的是,lparallel 也支持异步编程,而不是个纯并行库。像之前所说的那样,并行只是个抽象的概念,在并行中,任务之间是相互独立的。

lparallel 库基于 bordeaux 线程库构建的。

像之前提到的那样,并行和并发可以通过相同的方法实现(一般就是这样的):线程、进程等。唯一不同的就是概念上的差别。

注意本文中的例子不一定都是并行的。通常来说,像 Promises 和 Futures 中的异步构造更适合并发。

lparallel 库的操作方式如下(基本情况):

  • 通过 lparallel:make-kernel 调用内核来创建示例。其中内核就是调度和执行任务的组件。
  • 按照 futures、promises 和其他高级函数概念设计代码。为了实现这些,lparallel 支持 channelspromisesfuturecognate
  • 使用 lparallel 库调用 cognates 执行操作,其中 cognates 在 Common Lisp 中有等价的函数。比如说,并行函数 lparallel:pmap 等价于 Common Lisp 中的 map
  • 最后,关掉第一步中通过 kernel 创建的实例 lparallel:end-kernel

注意,确保运行中的任务需要符合逻辑,以及处理所有可变状态是由开发人员负责。

鸣谢:本文首次出现在 z0ltan.wordpress.com.

安装

先来看看能不能用 Quicklisp 安装 lparallel:

  1. CL-USER> (ql:system-apropos "lparallel")
  2. #<SYSTEM lparallel / lparallel-20160825-git / quicklisp 2016-08-25>
  3. #<SYSTEM lparallel-bench / lparallel-20160825-git / quicklisp 2016-08-25>
  4. #<SYSTEM lparallel-test / lparallel-20160825-git / quicklisp 2016-08-25>
  5. ; No value

看起来是可以,那么直接安装吧:

  1. CL-USER> (ql:quickload :lparallel)
  2. To load "lparallel":
  3. Load 2 ASDF systems:
  4. alexandria bordeaux-threads
  5. Install 1 Quicklisp release:
  6. lparallel
  7. ; Fetching #<URL "http://beta.quicklisp.org/archive/lparallel/2016-08-25/lparallel-20160825-git.tgz">
  8. ; 76.71KB
  9. ==================================================
  10. 78,551 bytes in 0.62 seconds (124.33KB/sec)
  11. ; Loading "lparallel"
  12. [package lparallel.util]..........................
  13. [package lparallel.thread-util]...................
  14. [package lparallel.raw-queue].....................
  15. [package lparallel.cons-queue]....................
  16. [package lparallel.vector-queue]..................
  17. [package lparallel.queue].........................
  18. [package lparallel.counter].......................
  19. [package lparallel.spin-queue]....................
  20. [package lparallel.kernel]........................
  21. [package lparallel.kernel-util]...................
  22. [package lparallel.promise].......................
  23. [package lparallel.ptree].........................
  24. [package lparallel.slet]..........................
  25. [package lparallel.defpun]........................
  26. [package lparallel.cognate].......................
  27. [package lparallel]
  28. (:LPARALLEL)

下面就来看看怎么使用 lparallel 库吧。

序:调用 CFFI 获取 CPU 核的数量

首先,要先了解要在并行的例子中使用的线程数。理想状况下,线程与 CPU 核的最好为 1:1。

cffi 库可以实现这个目标。本来是准备为 cffi 库写篇详细的文章,但现在,还是继续本章的话题吧:

安装 CFFI:

  1. CL-USER> (ql:quickload :cffi)
  2. To load "cffi":
  3. Load 4 ASDF systems:
  4. alexandria babel trivial-features uiop
  5. Install 1 Quicklisp release:
  6. cffi
  7. ; Fetching #<URL "http://beta.quicklisp.org/archive/cffi/2016-03-18/cffi_0.17.1.tgz">
  8. ; 234.48KB
  9. ==================================================
  10. 240,107 bytes in 5.98 seconds (39.22KB/sec)
  11. ; Loading "cffi"
  12. [package cffi-sys]................................
  13. [package cffi]....................................
  14. ..................................................
  15. [package cffi-features]
  16. (:CFFI)

用 C 语言获取计算机上的逻辑核数量:

  1. #include <stdio.h>
  2. #include <sys/types.h>
  3. #include <sys/sysctl.h>
  4. int get_core_count();
  5. int main()
  6. {
  7. printf("%d\n", get_core_count());
  8. return 0;
  9. }
  10. int32_t get_core_count()
  11. {
  12. const char* s = "hw.logicalcpu";
  13. int32_t core_count;
  14. size_t len = sizeof(core_count);
  15. sysctlbyname(s, &core_count, &len, NULL, 0);
  16. return core_count;
  17. }

将上面 C 代码编译成共享库(注:本人使用的是 Mac OS X,使用的是 Clang。用 gcc 的话,请参考相关文档):

  1. Timmys-MacBook-Pro:Parallelism z0ltan$ clang -dynamiclib get_core_count.c -o libcorecount.dylib

在 Common Lisp 中调用这个函数:

  1. CL-USER> (cffi:use-foreign-library "libcorecount.dylib")
  2. #<CFFI:FOREIGN-LIBRARY LIBCORECOUNT.DYLIB-853 "libcorecount.dylib">
  3. CL-USER> (cffi:foreign-funcall "get_core_count" :int)
  4. 8

可以看到结果计算机 CPU 的核(core)的数量是 8(这是正确的),可以通过下面的命令验证:

  1. Timmys-MacBook-Pro:Parallelism z0ltan$ sysctl -n "hw.logicalcpu"

通用配置

在本例中,会讲解初始化配置,同时会介绍这些配置的作用。

加载库:

  1. CL-USER> (ql:quickload :lparallel)
  2. To load "lparallel":
  3. Load 1 ASDF system:
  4. lparallel
  5. ; Loading "lparallel"
  6. (:LPARALLEL)

初始化 lparallel 内核:

  1. CL-USER> (setf lparallel:*kernel* (lparallel:make-kernel 8 :name "custom-kernel"))
  2. #<LPARALLEL.KERNEL:KERNEL :NAME "custom-kernel" :WORKER-COUNT 8 :USE-CALLER NIL :ALIVE T :SPIN-COUNT 2000 {1003141F03}>

注意:全局变量 *kernel* 可以重新赋值,这样就可以在同一运行期间共存多个内核。之后,定义 kernel 的相关信息:

  1. CL-USER> (defun show-kernel-info ()
  2. (let ((name (lparallel:kernel-name))
  3. (count (lparallel:kernel-worker-count))
  4. (context (lparallel:kernel-context))
  5. (bindings (lparallel:kernel-bindings)))
  6. (format t "Kernel name = ~a~%" name)
  7. (format t "Worker threads count = ~d~%" count)
  8. (format t "Kernel context = ~a~%" context)
  9. (format t "Kernel bindings = ~a~%" bindings)))
  10. WARNING: redefining COMMON-LISP-USER::SHOW-KERNEL-INFO in DEFUN
  11. SHOW-KERNEL-INFO
  12. CL-USER> (show-kernel-info)
  13. Kernel name = custom-kernel
  14. Worker threads count = 8
  15. Kernel context = #<FUNCTION FUNCALL>
  16. Kernel bindings = ((*STANDARD-OUTPUT* . #<SLIME-OUTPUT-STREAM {10044EEEA3}>)
  17. (*ERROR-OUTPUT* . #<SLIME-OUTPUT-STREAM {10044EEEA3}>))
  18. NIL

终止内核(这点很重要,因为不手动结束内核的话, *kernel* 不会被垃圾回收掉):

  1. CL-USER> (lparallel:end-kernel :wait t)
  2. (#<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {100723FA83}>
  3. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {100723FE23}>
  4. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {10072581E3}>
  5. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007258583}>
  6. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007258923}>
  7. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007258CC3}>
  8. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007259063}>
  9. #<SB-THREAD:THREAD "custom--kernel" FINISHED values: NIL {1007259403}>)

继续来看 lparallel 库的其他方面。

对于之后的 demo,从代码的角度会使用下面的初始设置:

  1. (require lparallel)
  2. (require bt-semaphore)
  3. (defpackage :lparallel-user
  4. (:use :cl :lparallel :lparallel.queue :bt-semaphore))
  5. (in-package :lparallel-user)
  6. ;;; initialise the kernel
  7. (defun init ()
  8. (setf *kernel* (make-kernel 8 :name "channel-queue-kernel")))
  9. (init)

所以内核使用的是 8 线程(可根据自己计算机自行设定)。

一旦运行完所有的例子后,需要运行下面代码来关闭内核并释放系统资源:

  1. ;;; shut the kernel down
  2. (defun shutdown ()
  3. (end-kernel :wait t))
  4. (shutdown)

通道(channels)和队列(queues)

首先,某些定义是有顺序的。

提交个内核的任务叫做 task。这只是个带参函数对象。

lparallel 中的通道(channel)和 Go 中通道的概念相似。只是与线程通信的一种方式。在本章的例子中,channel 就是个将任务提交给内核的特殊方法。

在 lparallel 中用 lparallel:make-channel 创建 channel。使用 lparallel:submit-task 提交任务,接受任务使用 lparallel:receive-result

例如说,计算数字的平方值:

  1. (defun calculate-square (n)
  2. (let* ((channel (lparallel:make-channel))
  3. (res nil))
  4. (lparallel:submit-task channel #'(lambda (x)
  5. (* x x))
  6. n)
  7. (setf res (lparallel:receive-result channel))
  8. (format t "Square of ~d = ~d~%" n res)))

输出为:

  1. LPARALLEL-USER> (calculate-square 100)
  2. Square of 100 = 10000
  3. NIL

现在,要将多个任务提交到统一个通道中。在下面的这个简单的例子中,只创建三个简单的任务,分别是计算输入的平方、三次放和四次方:

注意在多个任务重,输出结果的顺序并不确定:

  1. (defun test-basic-channel-multiple-tasks ()
  2. (let ((channel (make-channel))
  3. (res '()))
  4. (submit-task channel #'(lambda (x)
  5. (* x x))
  6. 10)
  7. (submit-task channel #'(lambda (y)
  8. (* y y y))
  9. 10)
  10. (submit-task channel #'(lambda (z)
  11. (* z z z z))
  12. 10)
  13. (dotimes (i 3 res)
  14. (push (receive-result channel) res))))

输出如下:

  1. LPARALLEL-USER> (dotimes (i 3)
  2. (print (test-basic-channel-multiple-tasks)))
  3. (100 1000 10000)
  4. (100 1000 10000)
  5. (10000 1000 100)
  6. NIL

为了将消息在运行中的线程中传递,lparallel 也支持创建阻塞队列。队列的创建使用的是 lparallel.queue:make-queue

实用的队列函数:

  • lparallel.queue:make-queue: create a FIFO blocking queue
  • lparallel.queue:push-queue: insert an element into the queue
  • lparallel.queue:pop-queue: pop an item from the queue
  • lparallel.queue:peek-queue: inspect value without popping it
  • lparallel.queue:queue-count: the number of entries in the queue
  • lparallel.queue:queue-full-p: check if the queue is full
  • lparallel.queue:queue-empty-p:check if the queue is empty
  • lparallel.queue:with-locked-queue: lock the queue during access

下面是展示基础队列的属性的 demo:

  1. (defun test-queue-properties ()
  2. (let ((queue (make-queue :fixed-capacity 5)))
  3. (loop
  4. when (queue-full-p queue)
  5. do (return)
  6. do (push-queue (random 100) queue))
  7. (print (queue-full-p queue))
  8. (loop
  9. when (queue-empty-p queue)
  10. do (return)
  11. do (print (pop-queue queue)))
  12. (print (queue-empty-p queue)))
  13. nil)

执行后的结果为:

  1. LPARALLEL-USER> (test-queue-properties)
  2. T
  3. 17
  4. 51
  5. 55
  6. 42
  7. 82
  8. T
  9. NIL

注:lparallel.queue:make-queue 是个通用接口,其后面实际上有多种类型队列。比如说,在上个例子中,基于 :fixed-capacify 关键词参数,真正的队列类型是 lparallel.vector-queue

文档中没有指明要给 lparallel.queue:make-queue 传递什么参数,因此可以用下面的方法来确定:

  1. LPARALLEL-USER> (describe 'lparallel.queue:make-queue)
  2. LPARALLEL.QUEUE:MAKE-QUEUE
  3. [symbol]
  4. MAKE-QUEUE names a compiled function:
  5. Lambda-list: (&REST ARGS)
  6. Derived type: FUNCTION
  7. Documentation:
  8. Create a queue.
  9. The queue contents may be initialized with the keyword argument
  10. `initial-contents'.
  11. By default there is no limit on the queue capacity. Passing a
  12. `fixed-capacity' keyword argument limits the capacity to the value
  13. passed. `push-queue' will block for a full fixed-capacity queue.
  14. Source file: /Users/z0ltan/quicklisp/dists/quicklisp/software/lparallel-20160825-git/src/queue.lisp
  15. MAKE-QUEUE has a compiler-macro:
  16. Source file: /Users/z0ltan/quicklisp/dists/quicklisp/software/lparallel-20160825-git/src/queue.lisp
  17. ; No value

如上所示: lparallel.queue:make-queue 支持两个关键词参数::fixed-capacityinitial-contents

现在,如果给定参数为 :fixed-capacity,那么队列的实际类型是 lparallel.vector-queue。如果没有给定这个关键词参数,队列的默认类型是 lparallel.cons-queue(大小不限的队列),如下面代码所展示的那样:

  1. (defun check-queue-types ()
  2. (let ((queue-one (make-queue :fixed-capacity 5))
  3. (queue-two (make-queue)))
  4. (format t "queue-one is of type: ~a~%" (type-of queue-one))
  5. (format t "queue-two is of type: ~a~%" (type-of queue-two))))
  6. LPARALLEL-USER> (check-queue-types)
  7. queue-one is of type: VECTOR-QUEUE
  8. queue-two is of type: CONS-QUEUE
  9. NIL

当然,你总是能指定创建示例队列的类型,但如果可能的话,最好还是坚持使用通用接口然后让库来创建适合的类型。

现在,来看看下面的代码吧:

  1. (defun test-basic-queue ()
  2. (let ((queue (make-queue))
  3. (channel (make-channel))
  4. (res '()))
  5. (submit-task channel #'(lambda ()
  6. (loop for entry = (pop-queue queue)
  7. when (queue-empty-p queue)
  8. do (return)
  9. do (push (* entry entry) res))))
  10. (dotimes (i 100)
  11. (push-queue i queue))
  12. (receive-result channel)
  13. (format t "~{~d ~}~%" res)))

在上面代码中,提交单个任务,该任务重复扫描队列,直到队列为空,提出(pop)其中的值,让后把提出的值雅茹 res 列表。

输出为:

  1. LPARALLEL-USER> (test-basic-queue)
  2. 9604 9409 9216 9025 8836 8649 8464 8281 8100 7921 7744 7569 7396 7225 7056 6889 6724 6561 6400 6241 6084 5929 5776 5625 5476 5329 5184 5041 4900 4761 4624 4489 4356 4225 4096 3969 3844 3721 3600 3481 3364 3249 3136 3025 2916 2809 2704 2601 2500 2401 2304 2209 2116 2025 1936 1849 1764 1681 1600 1521 1444 1369 1296 1225 1156 1089 1024 961 900 841 784 729 676 625 576 529 484 441 400 361 324 289 256 225 196 169 144 121 100 81 64 49 36 25 16 9 4 1 0
  3. NIL

终止任务

这是就适合使用 lparallel:kill-task 函数。这个函数在任务无响应时使用。lparallel 文档明确的指出杀死任务是最终手段。

创建的任务都会分配到 :default 中。动态属性 *task-category 用来存储这个值,且 task-category* 也可用动态地绑定到不同的值(如下所示):

  1. ;;; kill default tasks
  2. (defun test-kill-all-tasks ()
  3. (let ((channel (make-channel))
  4. (stream *query-io*))
  5. (dotimes (i 10)
  6. (submit-task channel #'(lambda (x)
  7. (sleep (random 10))
  8. (format stream "~d~%" (* x x))) (random 10)))
  9. (sleep (random 2))
  10. (kill-tasks :default)))

运行示例:

  1. LPARALLEL-USER> (test-kill-all-tasks)
  2. 16
  3. 1
  4. 8
  5. WARNING: lparallel: Replacing lost or dead worker.
  6. WARNING: lparallel: Replacing lost or dead worker.
  7. WARNING: lparallel: Replacing lost or dead worker.
  8. WARNING: lparallel: Replacing lost or dead worker.
  9. WARNING: lparallel: Replacing lost or dead worker.
  10. WARNING: lparallel: Replacing lost or dead worker.
  11. WARNING: lparallel: Replacing lost or dead worker.
  12. WARNING: lparallel: Replacing lost or dead worker.

由于创建了 10 个任务,8 个线程都被占用了。当终止掉 :default 中的任务时,所有的线程都会被终止然后重新生成(这个很费资源)。这就要避免使用 lparallel:kill-tasks 的原因。

在上述例子中,所有运行的任务都被终止了,因为这些任务都在 :default 中。假设只需要终止指定的任务,可以在创建任务时指定其 *task-category*,然后在调用 lparallel:kill-tasks 时指定这个 category。

例如,假设有两种任务:计算平方以及计算立方。分别将它们的类别设为 ’squaring-tasks 和 ‘cubing-tasks。然后随机的终止 ‘suqaring-tasks 或 ’cubing-tasks

下面是代码:

  1. ;;; kill tasks of a randomly chosen category
  2. (defun test-kill-random-tasks ()
  3. (let ((channel (make-channel))
  4. (stream *query-io*))
  5. (let ((*task-category* 'squaring-tasks))
  6. (dotimes (i 5)
  7. (submit-task channel #'(lambda (x)
  8. (sleep (random 5))
  9. (format stream "~%[Squaring] ~d = ~d" x (* x x))) i)))
  10. (let ((*task-category* 'cubing-tasks))
  11. (dotimes (i 5)
  12. (submit-task channel #'(lambda (x)
  13. (sleep (random 5))
  14. (format stream "~%[Cubing] ~d = ~d" x (* x x x))) i)))
  15. (sleep 1)
  16. (if (evenp (random 10))
  17. (progn
  18. (print "Killing squaring tasks")
  19. (kill-tasks 'squaring-tasks))
  20. (progn
  21. (print "Killing cubing tasks")
  22. (kill-tasks 'cubing-tasks)))))

以下是执行结果:

  1. LPARALLEL-USER> (test-kill-random-tasks)
  2. [Cubing] 2 = 8
  3. [Squaring] 4 = 16
  4. [Cubing] 4
  5. = [Cubing] 643 = 27
  6. "Killing squaring tasks"
  7. 4
  8. WARNING: lparallel: Replacing lost or dead worker.
  9. WARNING: lparallel: Replacing lost or dead worker.
  10. WARNING: lparallel: Replacing lost or dead worker.
  11. WARNING: lparallel: Replacing lost or dead worker.
  12. [Cubing] 1 = 1
  13. [Cubing] 0 = 0
  14. LPARALLEL-USER> (test-kill-random-tasks)
  15. [Squaring] 1 = 1
  16. [Squaring] 3 = 9
  17. "Killing cubing tasks"
  18. 5
  19. WARNING: lparallel: Replacing lost or dead worker.
  20. WARNING: lparallel: Replacing lost or dead worker.
  21. WARNING: lparallel: Replacing lost or dead worker.
  22. [Squaring] 2 = 4
  23. WARNING: lparallel: Replacing lost or dead worker.
  24. WARNING: lparallel: Replacing lost or dead worker.
  25. [Squaring] 0 = 0
  26. [Squaring] 4 = 16

promises 和 futures

promise 和 futures 支持异步编程。

用 lparallel 的说法(lparallel-speak),lparallel:promise 就是个占位符,用来存放提供的值。 promise 对应是通过 lparallel:promise 创建,赋值的话用 lparallel:fullfill宏。

要检查 promise 有没有赋值,可以用 lparallel:fulfilledp 断言函数。最后,lparallel:force 函数用来提取 promise 中的值。注意,lparallel:forece 函数在没完成前会进行阻塞。

先看个例子来巩固下上面的知识吧:

  1. (defun test-promise ()
  2. (let ((p (promise)))
  3. (loop
  4. do (if (evenp (read))
  5. (progn
  6. (fulfill p 'even-received!)
  7. (return))))
  8. (force p)))

会有以下的输出:

  1. LPARALLEL-USER> (test-promise)
  2. 5
  3. 1
  4. 3
  5. 10
  6. EVEN-RECEIVED!

说明:示例中会一直循环下去,直到输入的是偶数。在循环中 promise 使用 lparallel:fulfill 赋值,随后函数 lparallel:force 会强制返回这个值。

现在,来看个大点的例子吧。假设不想在那里等待 promise 赋值,而是在等待的时候做点其他的事,下面的例子中就将委托 promise 的赋值。

假设有个函数,会计算参数的平方。并且,因为这个参数,会消耗很多时间。从客户端的代码来看,只需要调用函数然后等待返回的平方值。

  1. (defun promise-with-threads ()
  2. (let ((p (promise))
  3. (stream *query-io*)
  4. (n (progn
  5. (princ "Enter a number: ")
  6. (read))))
  7. (format t "In main function...~%")
  8. (bt:make-thread
  9. #'(lambda ()
  10. (sleep (random 10))
  11. (format stream "Inside thread... fulfilling promise~%")
  12. (fulfill p (* n n))))
  13. (bt:make-thread
  14. #'(lambda ()
  15. (loop
  16. when (fulfilledp p)
  17. do (return)
  18. do (progn
  19. (format stream "~d~%" (random 100))
  20. (sleep (* 0.01 (random 100)))))))
  21. (format t "Inside main function, received value: ~d~%" (force p))))

结果如下:

  1. LPARALLEL-USER> (promise-with-threads)
  2. Enter a number: 19
  3. In main function...
  4. 44
  5. 59
  6. 90
  7. 34
  8. 30
  9. 76
  10. Inside thread... fulfilling promise
  11. Inside main function, received value: 361
  12. NIL

说明:上面的例子没什么可说的。创建个 promise 对象 p,然后创建线程,睡眠一段随机的时间,然后在给 promise 赋值。

同时,在主线程中,创建个线程不断地检查 promise 是否被赋值。如果 promise 没有被赋值,就会打印些随机数然后继续检查。一旦 promise 被赋值,就可以在主线程中用 lparallel:force 将 promise 的值提取出来。

这表明 promise 可以被不同的线程赋值,且创建 promise 的代码不需要等待 promise 赋值完成。这点就很重要,像之前提到的, lparallel:force 是个阻塞函数。需要推迟对 promise 值的提取直到 promise 完成赋值。

还有一点需要注意就是,在使用 promise 时,一旦被赋值,调用 lparallel:force 函数返回的值总是一样的。也就是说, promise 只能赋值一次。

例如:

  1. (defun multiple-fulfilling ()
  2. (let ((p (promise)))
  3. (dotimes (i 10)
  4. (fulfill p (random 100))
  5. (format t "~d~%" (force p)))))

输出的结果是:

  1. LPARALLEL-USER> (multiple-fulfilling)
  2. 15
  3. 15
  4. 15
  5. 15
  6. 15
  7. 15
  8. 15
  9. 15
  10. 15
  11. 15
  12. NIL

那么 future 与 promise 有什么区别呢?

lparallel:future 只是并行的 promise,也就是说,不会像 lparallel:promise 那样会在主线程中阻塞。future 是在自己的线程中执行(当然是在 lparallel 库中)。

future 的简单示例:

  1. (defun test-future ()
  2. (let ((f (future
  3. (sleep (random 5))
  4. (print "Hello from future!"))))
  5. (loop
  6. when (fulfilledp f)
  7. do (return)
  8. do (sleep (* 0.01 (random 100)))
  9. (format t "~d~%" (random 100)))
  10. (format t "~d~%" (force f))))

结果如下:

  1. LPARALLEL-USER> (test-future)
  2. 5
  3. 19
  4. 91
  5. 11
  6. Hello from future!
  7. NIL

说明:这和 promise-with-threads 的例子很像。然而,这有两个还是有不同的:第一,lparallel:future 宏有主体。这样的话 future 就能给自己赋值了。也就是说,一旦 future 的主体执行完成,lparallel:fulfilledp 对 future 对象总返回真。

第二,future 自身会通过库创建个单独的进程,所以它不会像 promise-with-threads 示例中的 promise 一样干扰当前的线程(需要指定线程给 promise 赋值,以避免阻塞)。

最有趣的一点是(即便是从 Dan Friedman 和其他人提出的实际理论来看),从概念上讲,future 是一个已赋值的 promise。也就是说,promise 就是 future 在某个时候会产生某种价值的合同,而 future 就是做这个的。

这里的意思是即便是使用 lparallel 库,future 的基本用法也是给 promise 赋值。这就意味着用户不需要创建类似 promise-with-threads 的函数。

现在来看些小例子来说明这点(必须承认,这个例子很刻意)

场景是这样的:需要读入一个数,然后计算这个数的平方。所以将这个工作从其他函数上卸下来,继续关注现有的事。当结果计算出来后,会自动将结果打印在终端,而不是要自己手动操作。

代码看起来是这样的:

  1. ;;; Callback example using promises and futures
  2. (defun callback-promise-future-demo ()
  3. (let* ((p (promise))
  4. (stream *query-io*)
  5. (n (progn
  6. (princ "Enter a number: ")
  7. (read)))
  8. (f (future
  9. (sleep (random 10))
  10. (fulfill p (* n n))
  11. (force (future
  12. (format stream "Square of ~d = ~d~%" n (force p)))))))
  13. (loop
  14. when (fulfilledp f)
  15. do (return)
  16. do (sleep (* 0.01 (random 100))))))

输出如下:

  1. LPARALLEL-USER> (callback-promise-future-demo)
  2. Enter a number: 19
  3. Square of 19 = 361
  4. NIL

说明:好吧,首先,创建个 promise 来生成的平方值。这就是对象 p。输入的值保存在本地变量 n 中。

然后创建个 future 对象 f。future 只计算输入值的平方然后将计算后的结果赋值给 promise。最后,需要它自动将结果打印出来,使用 force 函数提取匿名 future 的值,只输出所展示的字符串。

注意这和 Node 中的环境很像,在 Node 中,将回调函数传给其他函数,了解回调函数是在调用函数执行完后才会被调用。

最后要注意的是,下面的代码也是正常的(即便因为在单个线程上调用阻塞函数 lparallel:force):

  1. (force (future
  2. (format stream "Square of ~d = ~d~%" n (force p))))

总结一下,一般用法是:需要异步计算结果保存到对象时用 promise,future 用来个这些 promise 赋值

同源: Common Lisp 和 lparallel 中的同源函数

同源函数可以说是 lparallel 库存在的原因。这些构造确实是在 lparallel 中提供了并行。但要注意,大部分构造是基于 futures 和 promises 构建的。

简要地说,同源函数只是等价于 Common Lisp 中的行数的并行函数。但是,有极少数同源函数在 Common Lisp 中没有对应的函数。

这时,了解同源函数两个基础特性比较重要:

  • Constructs for fine-grained parallelism: defpun, plet, plet-if, etc.
  • Explicit functions and macros for performing parallel operations -
    pmap, preduce, psort, pdotimes, etc.

can. In this post, we will focus on the second category of cognates.
第一种情况下不用准确的控制操作。主要是库会尽可能地优化和并发这种格式。本文中,关注的是第二类同源函数。

Take, for instance, the cognate function lparallel:pmap is exactly
the same as the Common Lisp equivalent, map, but it runs in
parallel. Let’s demonstrate that through an example.j
举个例子,lparallel:pmap 函数和 map 一摸一样,但前者是并行执行的。用个例子来讲解下吧。

假设有多个随机字符串,其长度是从 3 到 10,然后需要将这些字符串的长度保存到个向量中。

首先创建个 helper 函数,用来生成随机字符串:

  1. (defvar *chars*
  2. (remove-duplicates
  3. (sort
  4. (loop for c across "The quick brown fox jumps over the lazy dog"
  5. when (alpha-char-p c)
  6. collect (char-downcase c))
  7. #'char<)))
  8. (defun get-random-strings (&optional (count 100000))
  9. "generate random strings between lengths 3 and 10"
  10. (loop repeat count
  11. collect
  12. (concatenate 'string (loop repeat (+ 3 (random 8))
  13. collect (nth (random 26) *chars*)))))

以下是 Common Lisp 的 map 版:

  1. ;;; map demo
  2. (defun test-map ()
  3. (map 'vector #'length (get-random-strings 100)))

测试一下:

  1. LPARALLEL-USER> (test-map)
  2. #(7 5 10 8 7 5 3 4 4 10)

等价于 lparallel:pmap 的函数:

  1. ;;;pmap demo
  2. (defun test-pmap ()
  3. (pmap 'vector #'length (get-random-strings 100)))

其作用为:

  1. LPARALLEL-USER> (test-pmap)
  2. #(8 7 6 7 6 4 5 6 5 7)
  3. LPARALLEL-USER>

从上面 test-map 和 test-pmap 中的定义格式来看,lparallel:maplparallel:pmap 的语法是一摸一样(好吧,大致相同,lparallel:pmap 还有一些可选参数)

一些实用的同源函数和宏(除了明确说明的,其他的都是函数)注意同源函数比较多,这里只是在每种中选了一些代表作为例子:

并行版 map:lparallel:pmap.

注意所有的 map 函数(lparallel:pmaplparallel:pmapclparallel:pmapcar等)有两个特殊的关键字参数:

  • :size, specifying the number of elements of the input
    sequence(s) to process, and
  • :parts which specifies the number of parallel parts to divide the
    sequence(s) into.
  1. ;;; pmap - function
  2. (defun test-pmap ()
  3. (let ((numbers (loop for i below 10
  4. collect i)))
  5. (pmap 'vector #'(lambda (x)
  6. (* x x))
  7. :parts (length numbers)
  8. numbers)))

执行示例:

  1. LPARALLEL-USER> (test-pmap)
  2. #(0 1 4 9 16 25 36 49 64 81)

并行版 or:lparallel:por

返回的是参数中第一个非 nil 元素。但是,由于这个宏是并行的,所以返回的元素不一定相同。

  1. ;;; por - macro
  2. (defun test-por ()
  3. (let ((a 100)
  4. (b 200)
  5. (c nil)
  6. (d 300))
  7. (por a b c d)))

执行示例:

  1. LPARALLEL-USER> (dotimes (i 10)
  2. (print (test-por)))
  3. 300
  4. 300
  5. 100
  6. 100
  7. 100
  8. 300
  9. 100
  10. 100
  11. 100
  12. 100
  13. NIL

常规的 or 操作符中,总是返回第一个非 nil 元素,即 100。

并行版 dotimes:lparallel:pdotimes

注,这个宏也有可选参数 :parts

  1. ;;; pdotimes - macro
  2. (defun test-pdotimes ()
  3. (pdotimes (i 5)
  4. (declare (ignore i))
  5. (print (random 100))))

运行示例:

  1. LPARALLEL-USER> (test-pdotimes)
  2. 39
  3. 29
  4. 81
  5. 42
  6. 56
  7. NIL

并行版 funcall:lparallel:pfuncall

  1. ;;; pfuncall - macro
  2. (defun test-pfuncall ()
  3. (pfuncall #'* 1 2 3 4 5))

运行示例:

  1. LPARALLEL-USER> (test-pfuncall)
  2. 120

并行版 reduce:lparallel:preduce

这个重要函数也有两个可选参数::parts(很参数名字意思一样)和 :recurse。如果 :recuse 为非 nil,会递归的将 lparallel:preduce 到参数上,否则默认使用 reduce。

  1. ;;; preduce - function
  2. (defun test-preduce ()
  3. (let ((numbers (loop for i from 1 to 100
  4. collect i)))
  5. (preduce #'+
  6. numbers
  7. :parts (length numbers)
  8. :recurse t)))

运行示例:

  1. LPARALLEL-USER> (test-preduce)
  2. 5050

并行版 remove-if-not:lparallel:premove-if-not

这个等价于 “filter”。

  1. ;;; premove-if-not
  2. (defun test-premove-if-not ()
  3. (let ((numbers (loop for i from 1 to 100
  4. collect i)))
  5. (premove-if-not #'evenp numbers)))

运行示例:

  1. LPARALLEL-USER> (test-premove-if-not)
  2. (2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54
  3. 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 100)

并行版 every:lparallel:pevery

  1. ;;; pevery - function
  2. (defun test-pevery ()
  3. (let ((numbers (loop for i from 1 to 100
  4. collect i)))
  5. (list (pevery #'evenp numbers)
  6. (pevery #'integerp numbers))))

运行示例:

  1. LPARALLEL-USER> (test-pevery)
  2. (NIL T)

在这个例子中,执行了两部检查:第一,1 到 100 范围内的数是否都是偶数,第二,在这个范围内的数是否都是整型。

并行版 count: lparallel:count

  1. ;;; pcount - function
  2. (defun test-pcount ()
  3. (let ((chars "The quick brown fox jumps over the lazy dog"))
  4. (pcount #\e chars)))

运行示例:

  1. LPARALLEL-USER> (test-pcount)
  2. 3

并发版 sort:lparallel:psort

  1. ;;; psort - function
  2. (defstruct person
  3. name
  4. age)
  5. (defun test-psort ()
  6. (let* ((names (list "Rich" "Peter" "Sybil" "Basil" "Candy" "Slava" "Olga"))
  7. (people (loop for name in names
  8. collect (make-person :name name :age (+ (random 20) 20)))))
  9. (print "Before sorting...")
  10. (print people)
  11. (fresh-line)
  12. (print "After sorting...")
  13. (psort
  14. people
  15. #'(lambda (x y)
  16. (< (person-age x)
  17. (person-age y)))
  18. :test #'=)))

运行示例:

  1. LPARALLEL-USER> (test-psort)
  2. "Before sorting..."
  3. (#S(PERSON :NAME "Rich" :AGE 38) #S(PERSON :NAME "Peter" :AGE 24)
  4. #S(PERSON :NAME "Sybil" :AGE 20) #S(PERSON :NAME "Basil" :AGE 22)
  5. #S(PERSON :NAME "Candy" :AGE 23) #S(PERSON :NAME "Slava" :AGE 37)
  6. #S(PERSON :NAME "Olga" :AGE 33))
  7. "After sorting..."
  8. (#S(PERSON :NAME "Sybil" :AGE 20) #S(PERSON :NAME "Basil" :AGE 22)
  9. #S(PERSON :NAME "Candy" :AGE 23) #S(PERSON :NAME "Peter" :AGE 24)
  10. #S(PERSON :NAME "Olga" :AGE 33) #S(PERSON :NAME "Slava" :AGE 37)
  11. #S(PERSON :NAME "Rich" :AGE 38))

在本例中,首先定义了个 person 结构体,来保存有关人的信息。然后创建了包含 7 个人(随机生成 20 到 39 之间的年龄)的列表。最后,对年龄进行升序排序。

异常处理

想要了解 lparallel 是怎么处理异常的(提示:用 lparallel:task-handler-bind),参考 https://z0ltan.wordpress.com/2016/09/10/basic-concurrency-and-parallelism-in-common-lisp-part-4b-parallelism-using-lparallel-error-handling/

在 Slime 中监视控制线程

M-x slime-list-threads (也可以通过 slime-selector 进行访问,快捷键是 t) 会列出运行中的线程的名字以及状态。

当前行的进程可以用快捷键 k 杀掉,或者如果需要杀掉多个线程的话,可以选中多行然后按 k 就可以杀掉所选中的线程。

快捷键 g 会更新线程列表,但当你有很多线程要在启动中和停止中的话,总是按 g 很麻烦,因此就有个变量 slime-threads-update-interval,把这个变量设为 X 的话,每隔 X 秒,线程列表会自动更新,比较合适的值是 0.5。

感谢 Slime tips.

参考

当然,还有更多的函数、对象以及通用方法(使用lparallel库执行并行计算)。这篇文章只涉及了一点皮毛而已。不过,本文中讲解是的通用流程,要进一步阅读,以下资料或许会有所帮助: