https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/
线程基本介绍
几乎所有操作系统都支持
几乎所有语言都有相应的多线程机制
线程是独立的处理流程,可以和系统的其他线程并行或者并发地执行。
多线程可以共享数据和资源,利用所谓的共享内存空间。
每一个线程基本元素
- 程序计数器
- 寄存器
- 栈
与同一进程的其他线程共享的资源基本上包括
- 数据
- 系统资源
线程的状态大体上可以分为
- ready
- running
- blocked
多线程编程一般使用共享内容空间进行线程间的通讯。这就使用管理内容空间成为多线程编程的重点和难点。
python 线程模块
Python通过标准库的 threading
模块来管理线程。这个模块提供了很多不错的特性,让线程变得无比简单。实际上,线程模块提供了几种同时运行的机制,实现起来非常简单。
线程模块的主要组件如下:
- 线程对象 Thread objects
- Lock对象 Lock objects
- Rlock对象 Rlock objects reentrant
- 信号对象 Semaphore objects
- 条件对象 Condition objects
- 事件对象 Event objects
- Timer objects
- 栅栏对象 Barrier objects
Thread objects
定义一个线程
最简单的一个方法是,用一个目标函数实例化一个 Thread类,然后调用它的 start方法启动它。
class threading.Thread(
group=None,
target=None, # 线程启动时要执行的函数,可以简单理解为一个线程就是一个函数
name=None, # 可以指定线程的名字
args=(), # 指定传递给target函数的参数
kwargs={} # 同上
)
线程 demo
import threading
# def worker():
# print('welcome shenzhen')
# print('thread over')
# if __name__ == "__main__":
# t = threading.Thread(target=worker)
# t.start()
def function(i):
print(f"function called by thread {i} - {i * i}\n")
return
threads = []
if __name__ == "__main__":
for i in range(100):
t = threading.Thread(target=function, args=(i, ))
threads.append(t)
t.start()
# t.join() # main线程等待 线程执行;然后开始新的循环
# 开启和关闭 t.join() 输出不一样,请思考为什么(参考文档)
# 使用idle运行, ide里可能看不出效果
'''
join(timeout=None)
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates
– either normally or through an unhandled exception – or until the optional timeout occurs.
'''
python的线程没有优先级, 线程组的概念(构造函数源码有预留group参数,以后可能实现ThreadGroup类来支持);也不能被销毁、停止、挂起,那也就没有中断了。
threading的属性和方法
名称 | 含义 | |
---|---|---|
current_thread() | 返回当前线程对象 | |
main_thread() | 返回主线程对象 | |
active_count() | 当前处理alive状态的线程个数 | |
enumerate() | 返回所有活着的线程的列表,不包括已经终止的和未开始的。 | |
get_ident() | 返回当前线程的id,非0整数 |
返回当前线程对象
import threading
import time
def worker():
time.sleep(0.5)
print(f'welcome shenzhen, current thread is {threading.currentThread()}')
if __name__ == "__main__":
for i in range(10):
t = threading.Thread(target=worker)
t.start()
"""
(pythonparallel-jU7W1wcf) λ python pythreaddemo.py
welcome shenzhen, current thread is <Thread(Thread-1, started 1392)>
welcome shenzhen, current thread is <Thread(Thread-4, started 14376)>
welcome shenzhen, current thread is <Thread(Thread-2, started 15436)>
welcome shenzhen, current thread is <Thread(Thread-7, started 15860)>
welcome shenzhen, current thread is <Thread(Thread-5, started 2928)>
welcome shenzhen, current thread is <Thread(Thread-8, started 15772)>
welcome shenzhen, current thread is <Thread(Thread-10, started 1312)>
welcome shenzhen, current thread is <Thread(Thread-6, started 11164)>
welcome shenzhen, current thread is <Thread(Thread-9, started 12056)>
welcome shenzhen, current thread is <Thread(Thread-3, started 11276)>
"""
"""
# Thread 的 repr 方法
def __repr__(self):
assert self._initialized, "Thread.__init__() was not called"
status = "initial"
if self._started.is_set():
status = "started"
self.is_alive() # easy way to get ._is_stopped set when appropriate
if self._is_stopped:
status = "stopped"
if self._daemonic:
status += " daemon"
if self._ident is not None:
status += " %s" % self._ident
return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
"""
确认当前线程
使用参数来确认或者命名是笨拙且没有必要的。
import threading
import time
def first_fun():
print(threading.currentThread().getName() + str(' is Starting '))
time.sleep(2)
print(threading.currentThread().getName() + str(' is Exiting '))
def second_fun():
print(threading.currentThread().getName() + str(' is Starting '))
time.sleep(2)
print(threading.currentThread().getName() + str(' is Exiting '))
def third_fun():
print(threading.currentThread().getName() + str(' is Starting '))
time.sleep(2)
print(threading.currentThread().getName() + str(' is Exiting '))
if __name__ == "__main__":
t1 = threading.Thread(name='first_func', target=first_fun)
t2 = threading.Thread(name='second_func', target=second_fun)
t3 = threading.Thread(target=third_fun)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
实现一个线程
使用 threading模块实现一个新的线程,需要下面3步
- 定义一个 Thread 类的子类(继承Thread类)
- 重写
__init_(self [, args])
方法,可以添加自己的参数 - 重写
run(self, [, args]
方法,实现线程要做的事
import threading
import time
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, counter):
super().__init__()
self.threadID = threadID
self.name = name
self.counter = counter
# return super().__init__(threadID, name, counter)
def run(self):
print(f"Starting {self.name}")
print_time(self.name, self.counter, 5)
print(f"Exiting {self.name}")
def print_time(threadName, counter, delay):
while counter:
if exitFlag:
# # python3 使用以下代码
# import _thread
# _thread.exit()
thread.exit()
time.sleep(delay)
print(f"{threadName}: {time.ctime(time.time())}")
counter -= 1
if __name__ == "__main__":
t1 = myThread(1, "Thread-1", 1)
t2 = myThread(2, "Thread-2", 2)
t1.start()
t2.start()
t1.join()
t2.join()
print("EXiting Main Thread")
输出
(pythonparallel-jU7W1wcf) D:\projects\pythoncode\pythonparallel>python thirddemo.py
Starting Thread-1
Starting Thread-2
Thread-1: Sun Jun 23 10:47:50 2019
Exiting Thread-1
Thread-2: Sun Jun 23 10:47:50 2019
Thread-2: Sun Jun 23 10:47:55 2019 # 线程2,第二次运行 sleep(5) 秒后,输出当前时间
Exiting Thread-2
EXiting Main Thread
线程同步- lock
当两个或者以上共享内存的操作发生在并发线程中时,并且至少一个可以写数据,又没有同步机制的条件下,就会产生竞争条件,可能会导致执行无效代码、bug、或者异常行为。
竞争条件最简单的解决方法就是使用锁。
锁的操作非常简单,当一个线程需要访问部分共享内存时,它必须先获得锁才能访问。此线程对这部分共享资源使用完成之后,该线程必须释放锁,然后其他线程就可以拿到这个锁并访问这部分资源了。
保证同一时刻只有一个线程允许访问共享内存。
并行 -> 串行
带来的问题:
- 死锁(两个以上线程争抢一个锁时)
如下事例
import threading
shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 1000000
shared_resource_lock = threading.Lock()
def increment_with_lock():
global shared_resource_with_lock
for i in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock += 1
shared_resource_lock.release()
def decrement_with_lock():
global shared_resource_with_lock
for i in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock -= 1
shared_resource_lock.release()
def increment_without_lock():
global shared_resource_with_no_lock
for i in range(COUNT):
shared_resource_with_no_lock += 1
def decrement_without_lock():
global shared_resource_with_no_lock
for i in range(COUNT):
shared_resource_with_no_lock -= 1
if __name__ == "__main__":
t1 = threading.Thread(target=increment_with_lock)
t2 = threading.Thread(target=decrement_with_lock)
t3 = threading.Thread(target=increment_without_lock)
t4 = threading.Thread(target=decrement_without_lock)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print(f"the value of shared variable with lock management is {shared_resource_with_lock}")
print(f"the value of shared variable with race condition is {shared_resource_with_no_lock}")
'''
D:\projects\pythoncode\pythonparallel>python lockdemo.py
the value of shared variable with lock management is 0 # 正确,两个线程并行了同样次数的操作
the value of shared variable with race condition is 273145 # 异常,修改全局对象,未使用锁,结果不确定
'''
总结
锁有两种状态
- locked
- unlocked
锁有两个方法来操作
- acquire()
- release()
需要遵循以下规则:
- 如果状态是unlocked,可以调用 acquire方法将状态locked
- 如果状态是locked,acquire() 会被block直到另一个线路调用 release()释放锁
- 如果状态是 unlocked,调用 release() 将导致 RuntimeError 异常
- 如果状态是 locked, 可以调用 release() 将状态改为 unlocked
锁是一种保守的方法。