https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/
线程基本介绍
几乎所有操作系统都支持
几乎所有语言都有相应的多线程机制
线程是独立的处理流程,可以和系统的其他线程并行或者并发地执行。
多线程可以共享数据和资源,利用所谓的共享内存空间。
每一个线程基本元素
- 程序计数器
- 寄存器
- 栈
与同一进程的其他线程共享的资源基本上包括
- 数据
- 系统资源
线程的状态大体上可以分为
- ready
- running
- blocked
多线程编程一般使用共享内容空间进行线程间的通讯。这就使用管理内容空间成为多线程编程的重点和难点。
python 线程模块
Python通过标准库的 threading 模块来管理线程。这个模块提供了很多不错的特性,让线程变得无比简单。实际上,线程模块提供了几种同时运行的机制,实现起来非常简单。
线程模块的主要组件如下:
- 线程对象 Thread objects
- Lock对象 Lock objects
- Rlock对象 Rlock objects reentrant
- 信号对象 Semaphore objects
- 条件对象 Condition objects
- 事件对象 Event objects
- Timer objects
- 栅栏对象 Barrier objects

Thread objects
定义一个线程
最简单的一个方法是,用一个目标函数实例化一个 Thread类,然后调用它的 start方法启动它。
class threading.Thread(group=None,target=None, # 线程启动时要执行的函数,可以简单理解为一个线程就是一个函数name=None, # 可以指定线程的名字args=(), # 指定传递给target函数的参数kwargs={} # 同上)
线程 demo
import threading# def worker():# print('welcome shenzhen')# print('thread over')# if __name__ == "__main__":# t = threading.Thread(target=worker)# t.start()def function(i):print(f"function called by thread {i} - {i * i}\n")returnthreads = []if __name__ == "__main__":for i in range(100):t = threading.Thread(target=function, args=(i, ))threads.append(t)t.start()# t.join() # main线程等待 线程执行;然后开始新的循环# 开启和关闭 t.join() 输出不一样,请思考为什么(参考文档)# 使用idle运行, ide里可能看不出效果'''join(timeout=None)Wait until the thread terminates.This blocks the calling thread until the thread whose join() method is called terminates– either normally or through an unhandled exception – or until the optional timeout occurs.'''
python的线程没有优先级, 线程组的概念(构造函数源码有预留group参数,以后可能实现ThreadGroup类来支持);也不能被销毁、停止、挂起,那也就没有中断了。
threading的属性和方法
| 名称 | 含义 | |
|---|---|---|
| current_thread() | 返回当前线程对象 | |
| main_thread() | 返回主线程对象 | |
| active_count() | 当前处理alive状态的线程个数 | |
| enumerate() | 返回所有活着的线程的列表,不包括已经终止的和未开始的。 | |
| get_ident() | 返回当前线程的id,非0整数 |
返回当前线程对象
import threadingimport timedef worker():time.sleep(0.5)print(f'welcome shenzhen, current thread is {threading.currentThread()}')if __name__ == "__main__":for i in range(10):t = threading.Thread(target=worker)t.start()"""(pythonparallel-jU7W1wcf) λ python pythreaddemo.pywelcome shenzhen, current thread is <Thread(Thread-1, started 1392)>welcome shenzhen, current thread is <Thread(Thread-4, started 14376)>welcome shenzhen, current thread is <Thread(Thread-2, started 15436)>welcome shenzhen, current thread is <Thread(Thread-7, started 15860)>welcome shenzhen, current thread is <Thread(Thread-5, started 2928)>welcome shenzhen, current thread is <Thread(Thread-8, started 15772)>welcome shenzhen, current thread is <Thread(Thread-10, started 1312)>welcome shenzhen, current thread is <Thread(Thread-6, started 11164)>welcome shenzhen, current thread is <Thread(Thread-9, started 12056)>welcome shenzhen, current thread is <Thread(Thread-3, started 11276)>""""""# Thread 的 repr 方法def __repr__(self):assert self._initialized, "Thread.__init__() was not called"status = "initial"if self._started.is_set():status = "started"self.is_alive() # easy way to get ._is_stopped set when appropriateif self._is_stopped:status = "stopped"if self._daemonic:status += " daemon"if self._ident is not None:status += " %s" % self._identreturn "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)"""
确认当前线程
使用参数来确认或者命名是笨拙且没有必要的。
import threadingimport timedef first_fun():print(threading.currentThread().getName() + str(' is Starting '))time.sleep(2)print(threading.currentThread().getName() + str(' is Exiting '))def second_fun():print(threading.currentThread().getName() + str(' is Starting '))time.sleep(2)print(threading.currentThread().getName() + str(' is Exiting '))def third_fun():print(threading.currentThread().getName() + str(' is Starting '))time.sleep(2)print(threading.currentThread().getName() + str(' is Exiting '))if __name__ == "__main__":t1 = threading.Thread(name='first_func', target=first_fun)t2 = threading.Thread(name='second_func', target=second_fun)t3 = threading.Thread(target=third_fun)t1.start()t2.start()t3.start()t1.join()t2.join()t3.join()
实现一个线程
使用 threading模块实现一个新的线程,需要下面3步
- 定义一个 Thread 类的子类(继承Thread类)
- 重写
__init_(self [, args])方法,可以添加自己的参数 - 重写
run(self, [, args]方法,实现线程要做的事
import threadingimport timeexitFlag = 0class myThread(threading.Thread):def __init__(self, threadID, name, counter):super().__init__()self.threadID = threadIDself.name = nameself.counter = counter# return super().__init__(threadID, name, counter)def run(self):print(f"Starting {self.name}")print_time(self.name, self.counter, 5)print(f"Exiting {self.name}")def print_time(threadName, counter, delay):while counter:if exitFlag:# # python3 使用以下代码# import _thread# _thread.exit()thread.exit()time.sleep(delay)print(f"{threadName}: {time.ctime(time.time())}")counter -= 1if __name__ == "__main__":t1 = myThread(1, "Thread-1", 1)t2 = myThread(2, "Thread-2", 2)t1.start()t2.start()t1.join()t2.join()print("EXiting Main Thread")
输出
(pythonparallel-jU7W1wcf) D:\projects\pythoncode\pythonparallel>python thirddemo.pyStarting Thread-1Starting Thread-2Thread-1: Sun Jun 23 10:47:50 2019Exiting Thread-1Thread-2: Sun Jun 23 10:47:50 2019Thread-2: Sun Jun 23 10:47:55 2019 # 线程2,第二次运行 sleep(5) 秒后,输出当前时间Exiting Thread-2EXiting Main Thread
线程同步- lock
当两个或者以上共享内存的操作发生在并发线程中时,并且至少一个可以写数据,又没有同步机制的条件下,就会产生竞争条件,可能会导致执行无效代码、bug、或者异常行为。
竞争条件最简单的解决方法就是使用锁。
锁的操作非常简单,当一个线程需要访问部分共享内存时,它必须先获得锁才能访问。此线程对这部分共享资源使用完成之后,该线程必须释放锁,然后其他线程就可以拿到这个锁并访问这部分资源了。
保证同一时刻只有一个线程允许访问共享内存。
并行 -> 串行
带来的问题:
- 死锁(两个以上线程争抢一个锁时)
如下事例
import threadingshared_resource_with_lock = 0shared_resource_with_no_lock = 0COUNT = 1000000shared_resource_lock = threading.Lock()def increment_with_lock():global shared_resource_with_lockfor i in range(COUNT):shared_resource_lock.acquire()shared_resource_with_lock += 1shared_resource_lock.release()def decrement_with_lock():global shared_resource_with_lockfor i in range(COUNT):shared_resource_lock.acquire()shared_resource_with_lock -= 1shared_resource_lock.release()def increment_without_lock():global shared_resource_with_no_lockfor i in range(COUNT):shared_resource_with_no_lock += 1def decrement_without_lock():global shared_resource_with_no_lockfor i in range(COUNT):shared_resource_with_no_lock -= 1if __name__ == "__main__":t1 = threading.Thread(target=increment_with_lock)t2 = threading.Thread(target=decrement_with_lock)t3 = threading.Thread(target=increment_without_lock)t4 = threading.Thread(target=decrement_without_lock)t1.start()t2.start()t3.start()t4.start()t1.join()t2.join()t3.join()t4.join()print(f"the value of shared variable with lock management is {shared_resource_with_lock}")print(f"the value of shared variable with race condition is {shared_resource_with_no_lock}")'''D:\projects\pythoncode\pythonparallel>python lockdemo.pythe value of shared variable with lock management is 0 # 正确,两个线程并行了同样次数的操作the value of shared variable with race condition is 273145 # 异常,修改全局对象,未使用锁,结果不确定'''
总结
锁有两种状态
- locked
- unlocked
锁有两个方法来操作
- acquire()
- release()
需要遵循以下规则:
- 如果状态是unlocked,可以调用 acquire方法将状态locked
- 如果状态是locked,acquire() 会被block直到另一个线路调用 release()释放锁
- 如果状态是 unlocked,调用 release() 将导致 RuntimeError 异常
- 如果状态是 locked, 可以调用 release() 将状态改为 unlocked
锁是一种保守的方法。
