https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/

线程基本介绍

几乎所有操作系统都支持
几乎所有语言都有相应的多线程机制

线程是独立的处理流程,可以和系统的其他线程并行或者并发地执行。
多线程可以共享数据和资源,利用所谓的共享内存空间。

每一个线程基本元素

  • 程序计数器
  • 寄存器

与同一进程的其他线程共享的资源基本上包括

  • 数据
  • 系统资源

线程的状态大体上可以分为

  • ready
  • running
  • blocked

多线程编程一般使用共享内容空间进行线程间的通讯。这就使用管理内容空间成为多线程编程的重点和难点。

python 线程模块

Python通过标准库的 threading 模块来管理线程。这个模块提供了很多不错的特性,让线程变得无比简单。实际上,线程模块提供了几种同时运行的机制,实现起来非常简单。

线程模块的主要组件如下:

  • 线程对象 Thread objects
  • Lock对象 Lock objects
  • Rlock对象 Rlock objects reentrant
  • 信号对象 Semaphore objects
  • 条件对象 Condition objects
  • 事件对象 Event objects
  • Timer objects
  • 栅栏对象 Barrier objects

image.png

Thread objects

定义一个线程

最简单的一个方法是,用一个目标函数实例化一个 Thread类,然后调用它的 start方法启动它。

  1. class threading.Thread(
  2. group=None,
  3. target=None, # 线程启动时要执行的函数,可以简单理解为一个线程就是一个函数
  4. name=None, # 可以指定线程的名字
  5. args=(), # 指定传递给target函数的参数
  6. kwargs={} # 同上
  7. )

线程 demo

  1. import threading
  2. # def worker():
  3. # print('welcome shenzhen')
  4. # print('thread over')
  5. # if __name__ == "__main__":
  6. # t = threading.Thread(target=worker)
  7. # t.start()
  8. def function(i):
  9. print(f"function called by thread {i} - {i * i}\n")
  10. return
  11. threads = []
  12. if __name__ == "__main__":
  13. for i in range(100):
  14. t = threading.Thread(target=function, args=(i, ))
  15. threads.append(t)
  16. t.start()
  17. # t.join() # main线程等待 线程执行;然后开始新的循环
  18. # 开启和关闭 t.join() 输出不一样,请思考为什么(参考文档)
  19. # 使用idle运行, ide里可能看不出效果
  20. '''
  21. join(timeout=None)
  22. Wait until the thread terminates.
  23. This blocks the calling thread until the thread whose join() method is called terminates
  24. – either normally or through an unhandled exception – or until the optional timeout occurs.
  25. '''

python的线程没有优先级, 线程组的概念(构造函数源码有预留group参数,以后可能实现ThreadGroup类来支持);也不能被销毁、停止、挂起,那也就没有中断了。

threading的属性和方法

名称 含义
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处理alive状态的线程个数
enumerate() 返回所有活着的线程的列表,不包括已经终止的和未开始的。
get_ident() 返回当前线程的id,非0整数

返回当前线程对象

  1. import threading
  2. import time
  3. def worker():
  4. time.sleep(0.5)
  5. print(f'welcome shenzhen, current thread is {threading.currentThread()}')
  6. if __name__ == "__main__":
  7. for i in range(10):
  8. t = threading.Thread(target=worker)
  9. t.start()
  10. """
  11. (pythonparallel-jU7W1wcf) λ python pythreaddemo.py
  12. welcome shenzhen, current thread is <Thread(Thread-1, started 1392)>
  13. welcome shenzhen, current thread is <Thread(Thread-4, started 14376)>
  14. welcome shenzhen, current thread is <Thread(Thread-2, started 15436)>
  15. welcome shenzhen, current thread is <Thread(Thread-7, started 15860)>
  16. welcome shenzhen, current thread is <Thread(Thread-5, started 2928)>
  17. welcome shenzhen, current thread is <Thread(Thread-8, started 15772)>
  18. welcome shenzhen, current thread is <Thread(Thread-10, started 1312)>
  19. welcome shenzhen, current thread is <Thread(Thread-6, started 11164)>
  20. welcome shenzhen, current thread is <Thread(Thread-9, started 12056)>
  21. welcome shenzhen, current thread is <Thread(Thread-3, started 11276)>
  22. """
  23. """
  24. # Thread 的 repr 方法
  25. def __repr__(self):
  26. assert self._initialized, "Thread.__init__() was not called"
  27. status = "initial"
  28. if self._started.is_set():
  29. status = "started"
  30. self.is_alive() # easy way to get ._is_stopped set when appropriate
  31. if self._is_stopped:
  32. status = "stopped"
  33. if self._daemonic:
  34. status += " daemon"
  35. if self._ident is not None:
  36. status += " %s" % self._ident
  37. return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
  38. """

确认当前线程

使用参数来确认或者命名是笨拙且没有必要的。

  1. import threading
  2. import time
  3. def first_fun():
  4. print(threading.currentThread().getName() + str(' is Starting '))
  5. time.sleep(2)
  6. print(threading.currentThread().getName() + str(' is Exiting '))
  7. def second_fun():
  8. print(threading.currentThread().getName() + str(' is Starting '))
  9. time.sleep(2)
  10. print(threading.currentThread().getName() + str(' is Exiting '))
  11. def third_fun():
  12. print(threading.currentThread().getName() + str(' is Starting '))
  13. time.sleep(2)
  14. print(threading.currentThread().getName() + str(' is Exiting '))
  15. if __name__ == "__main__":
  16. t1 = threading.Thread(name='first_func', target=first_fun)
  17. t2 = threading.Thread(name='second_func', target=second_fun)
  18. t3 = threading.Thread(target=third_fun)
  19. t1.start()
  20. t2.start()
  21. t3.start()
  22. t1.join()
  23. t2.join()
  24. t3.join()

实现一个线程

使用 threading模块实现一个新的线程,需要下面3步

  • 定义一个 Thread 类的子类(继承Thread类)
  • 重写 __init_(self [, args]) 方法,可以添加自己的参数
  • 重写 run(self, [, args] 方法,实现线程要做的事
  1. import threading
  2. import time
  3. exitFlag = 0
  4. class myThread(threading.Thread):
  5. def __init__(self, threadID, name, counter):
  6. super().__init__()
  7. self.threadID = threadID
  8. self.name = name
  9. self.counter = counter
  10. # return super().__init__(threadID, name, counter)
  11. def run(self):
  12. print(f"Starting {self.name}")
  13. print_time(self.name, self.counter, 5)
  14. print(f"Exiting {self.name}")
  15. def print_time(threadName, counter, delay):
  16. while counter:
  17. if exitFlag:
  18. # # python3 使用以下代码
  19. # import _thread
  20. # _thread.exit()
  21. thread.exit()
  22. time.sleep(delay)
  23. print(f"{threadName}: {time.ctime(time.time())}")
  24. counter -= 1
  25. if __name__ == "__main__":
  26. t1 = myThread(1, "Thread-1", 1)
  27. t2 = myThread(2, "Thread-2", 2)
  28. t1.start()
  29. t2.start()
  30. t1.join()
  31. t2.join()
  32. print("EXiting Main Thread")

输出

  1. (pythonparallel-jU7W1wcf) D:\projects\pythoncode\pythonparallel>python thirddemo.py
  2. Starting Thread-1
  3. Starting Thread-2
  4. Thread-1: Sun Jun 23 10:47:50 2019
  5. Exiting Thread-1
  6. Thread-2: Sun Jun 23 10:47:50 2019
  7. Thread-2: Sun Jun 23 10:47:55 2019 # 线程2,第二次运行 sleep(5) 秒后,输出当前时间
  8. Exiting Thread-2
  9. EXiting Main Thread

线程同步- lock

当两个或者以上共享内存的操作发生在并发线程中时,并且至少一个可以写数据,又没有同步机制的条件下,就会产生竞争条件,可能会导致执行无效代码、bug、或者异常行为。

竞争条件最简单的解决方法就是使用锁。

锁的操作非常简单,当一个线程需要访问部分共享内存时,它必须先获得锁才能访问。此线程对这部分共享资源使用完成之后,该线程必须释放锁,然后其他线程就可以拿到这个锁并访问这部分资源了。

保证同一时刻只有一个线程允许访问共享内存。

并行 -> 串行

带来的问题:

  • 死锁(两个以上线程争抢一个锁时)

如下事例

  1. import threading
  2. shared_resource_with_lock = 0
  3. shared_resource_with_no_lock = 0
  4. COUNT = 1000000
  5. shared_resource_lock = threading.Lock()
  6. def increment_with_lock():
  7. global shared_resource_with_lock
  8. for i in range(COUNT):
  9. shared_resource_lock.acquire()
  10. shared_resource_with_lock += 1
  11. shared_resource_lock.release()
  12. def decrement_with_lock():
  13. global shared_resource_with_lock
  14. for i in range(COUNT):
  15. shared_resource_lock.acquire()
  16. shared_resource_with_lock -= 1
  17. shared_resource_lock.release()
  18. def increment_without_lock():
  19. global shared_resource_with_no_lock
  20. for i in range(COUNT):
  21. shared_resource_with_no_lock += 1
  22. def decrement_without_lock():
  23. global shared_resource_with_no_lock
  24. for i in range(COUNT):
  25. shared_resource_with_no_lock -= 1
  26. if __name__ == "__main__":
  27. t1 = threading.Thread(target=increment_with_lock)
  28. t2 = threading.Thread(target=decrement_with_lock)
  29. t3 = threading.Thread(target=increment_without_lock)
  30. t4 = threading.Thread(target=decrement_without_lock)
  31. t1.start()
  32. t2.start()
  33. t3.start()
  34. t4.start()
  35. t1.join()
  36. t2.join()
  37. t3.join()
  38. t4.join()
  39. print(f"the value of shared variable with lock management is {shared_resource_with_lock}")
  40. print(f"the value of shared variable with race condition is {shared_resource_with_no_lock}")
  41. '''
  42. D:\projects\pythoncode\pythonparallel>python lockdemo.py
  43. the value of shared variable with lock management is 0 # 正确,两个线程并行了同样次数的操作
  44. the value of shared variable with race condition is 273145 # 异常,修改全局对象,未使用锁,结果不确定
  45. '''

总结
锁有两种状态

  • locked
  • unlocked

锁有两个方法来操作

  • acquire()
  • release()

需要遵循以下规则:

  • 如果状态是unlocked,可以调用 acquire方法将状态locked
  • 如果状态是locked,acquire() 会被block直到另一个线路调用 release()释放锁
  • 如果状态是 unlocked,调用 release() 将导致 RuntimeError 异常
  • 如果状态是 locked, 可以调用 release() 将状态改为 unlocked

锁是一种保守的方法。