- 1. threading常用模块静态方法
- 2. threading.Thread
- 3. 继承自threading.Thread的封装线程
- 3.1 Timer — 指定时间后执行线程启动
- 3.1.1 Timer类原型说明
- 源码">3.1.2 Timer实现源码
- 3.1 Timer — 指定时间后执行线程启动
- 4. 线程间通信
- 5. 小结
1. threading常用模块静态方法
1.1 threading.current_thread()
return the current Thread Object
1.2 threading.enumerate()
return a list of all Thread objects currently alive (excludes terminated threads and threads that have not yet been started)
1.3 threading.main_thread()
2. threading.Thread
2.1 Thread类原型说明
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- target is the callable object to be invoked by the
run()
- name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number
- args is the argument tuple for the target invocation. Defaults to
()
- kwargs is a dictionary of keyword arguments for the target invocation. Defaults to
{}
线程启动,必须要使用 Thread.start() 方法
若通过继承Thread类,可对 Thread.run()
方法进行重写
import threading
import logging
import time
logging.basicConfig(level=logging.DEBUG,
format='%(threadName)-9s %(message)s')
class MyThread(threading.Thread):
def __init__(self, args=(), kwargs=None):
super().__init__()
self.args = args
self.kwargs = kwargs
return
def run(self):
logging.debug('running with %s and %s', self.args, self.kwargs)
return
2.2 Thread重要方法
2.2.1 join(timeout=None)
join(timeout=None)
Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.
When the timeout _argument is present and not None
, it should be a floating point number specifying a timeout for the operation in seconds
As _join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join()call timed out
2.2.2 is_alive()
Return whether the thread is alive.
This method returns True
just before the run()
method starts until just after the run()
method terminates.
Note:配合join(timeout=xxx),进入 超时等待阻塞
状态,避免无法响应调用join方法的线程的特殊动作。
Ctrl-C退出线程示例
def main():
try:
thread1.start()
## thread is totally blocking e.g. while (1)
while True:
thread1.join(2) # 避免主线程一直处于等待阻塞状态而无法响应Exception
if not thread1.isAlive: # 判别主线程重返运行态原因:超时 or 子线程结束
break
except KeyboardInterrupt:
print "Ctrl-c pressed ..."
sys.exit(1)
2.2.3 isDaemon() & setDaemon()
**
3. 继承自threading.Thread的封装线程
3.1 Timer — 指定时间后执行线程启动
3.1.1 Timer类原型说明
class threading.Timer(interval, function, args=None, kwargs=None)
Create a timer that will run function with arguments args and keyword arguments kwargs, after intervalseconds have passed
特殊方法:
cancel
: Stop the timer, and cancel the execution of the timer’s action. This will only work if the timer is still in its waiting stage
3.1.2 Timer实现源码
class Timer(Thread):
"""Call a function after a specified number of seconds:
t = Timer(30.0, f, args=None, kwargs=None)
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet."""
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
Timer类实现主要依赖了 Event机制 ,请见4.1.2节。
4. 线程间通信
Thread synchronization:
threading.Thread.join
threading.``Event
threading.``Condition
threading.``Semaphore
Resource Lock:
threading.``Lock
threading.RLock
4.1 Thread synchronization
4.1.1 threading.Thread.join
见 2.2.1节
4.1.2 threading.Event
4.1.2.1 Event类原型说明
Event object is one of the simplest mechanisms for communication between threads:one thread signals an event and other threads wait for it .
An event object manages an internal flag that can be set to true with the set()
method and reset to false with the [clear()](https://docs.python.org/3.6/library/threading.html#threading.Event.clear)
method. The wait()
method blocks until the flag is true.
Event可以认为是Condition的高层封装结构:
- 不可 外传Lock 参数
- 增加首次check条件 内部flag,若不满足条件才进行wait
- 唤醒通知将notify所有等待该event的处于等待阻塞状态的线程
4.1.2.2 Event重要方法
is_set()
Return true if and only if the internal flag is true
set()
Acquire Condition’s Lock, set the internal flag to True, then notify_all
clear()
Acquire Condition’s Lock, set the internal flag to false.
wait(timeout=None)
Acquire Condition’s Lock and check the internal flag fist.
if flag is True, return True, do not block;
if flag is False, call the condition’s wait method, block until the internal flag is true or timeout. Return the wait method’s return.
4.1.2.3 Event实现源码
与预期分析一致,Event为Condition的高层封装结构,不外传Lock,且有先置check条件,唤醒通知将通知所有等待该event的处于等待阻塞状态的线程。
class Event:
"""Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset
to false with the clear() method. The wait() method blocks until the flag is
true. The flag is initially false.
"""
# After Tim Peters' event class (without is_posted())
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set
def set(self):
"""Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads
that call wait() once the flag is true will not block at all.
"""
with self._cond:
self._flag = True
self._cond.notify_all()
def clear(self):
"""Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to
set the internal flag to true again.
"""
with self._cond:
self._flag = False
def wait(self, timeout=None):
"""Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise,
block until another thread calls set() to set the flag to true, or until
the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).
This method returns the internal flag on exit, so it will always return
True except if a timeout is given and the operation times out.
"""
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
4.1.3 threading.Condition
4.1.3.1 线程状态
线程分为5种状态:初始态,就绪态,运行态,阻塞态,终止态。如图所示:
4.1.3.2 条件变量线程状态切换及资源锁定时序图示例
4.1.3.3 线程等待阻塞状态及同步阻塞状态转换图
注:
- 步骤7中的synchronized,即为 条件变量释放锁 动作
- 调用条件变量的 wait或notify 方法时,必须要满足的两个条件:
- 线程 获取了资源锁
- 线程处于 运行态
wait方法,实际触发了两个动作:
i. 调用条件变量wait方法的线程,释放条件变量的底层锁
ii. 调用条件变量wait方法的线程,从 运行态 切换到 等待阻塞状态
4.1.3.4 Condition类原型说明
class threading.Condition(lock=None)
This class implements condition variable objects. A condition variable allows one or more threads to wait until they are notified by another thread.
If the lock argument is given and not None, it must be a Lock or RLock object, and it is used as the underlying lock. Otherwise, a new RLock object is created and used as the underlying lock.
4.1.3.5 Condition重要方法
- wait(timeout=None)
Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns.
The return value is True unless a given timeout expired, in which case it is False.
- wait_for(predicate, timeout=None)
Wait until a condition evaluates to true. predicate should be a callable which result will be interpreted as a boolean value. A timeout may be provided giving the maximum time to wait
The lock must be held when called and is re-acquired on return
- notify(n=1)
By default, wake up one thread waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.
Tips: 在调用notify前,确保已经获取到锁
Note: an awakened thread does not actually return from its wait() call until it can reacquire the lock. Since notify() does not release the lock, its caller should.
Tips: notify并不释放锁,故并非调用notify后等待线程立即被唤醒。等待线程被notify后,有等待阻塞状态,切换到同步阻塞状态等待CPU调度。
- notify_all()
Wake up all threads waiting on this condition.
Note: 支持 上下文管理器
4.1.4 threading.semaphore
4.1.4.1 semaphore类原型说明
class threading.Semaphore(value=1)
This is one of the oldest synchronization primitives in the history of computer science, invented by the early Dutch computer scientist Edsger W. Dijkstra (he used the names P() and V() instead of acquire() andrelease()). A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release(). — from https://docs.python.org/3.6/library/threading.html
A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().
Tips: 限制访问线程数量。在受限系统资源下,仅允许一定数量的服务线程同时提供服务。
4.1.4.2 semaphore重要方法
- acquire(blocking=True, timeout=None)
When invoked without arguments:
- If the internal counter is larger than zero on entry, decrement it by one and return true immediately.
- If the internal counter is zero on entry, block until awoken by a call to release(). Once awoken (and the counter is greater than 0), decrement the counter by 1 and return true. Exactly one thread will be awoken by each call to release(). The order in which threads are awoken should not be relied on.
- release()
Release a semaphore, incrementing the internal counter by one. When it was zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread.
Note: 支持 上下文管理器
4.1.4.3 semaphore实现源码
class Semaphore:
"""This class implements semaphore objects.
Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.
"""
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
"""Acquire a semaphore, decrementing the internal counter by one.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.
When invoked with a timeout other than None, it will block for at
most timeout seconds. If acquire does not complete successfully in
that interval, return false. Return true otherwise.
"""
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
__enter__ = acquire
def release(self):
"""Release a semaphore, incrementing the internal counter by one.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
"""
with self._cond:
self._value += 1
self._cond.notify()
def __exit__(self, t, v, tb):
self.release()
4.1.5 threading.Barrier
4.1.5.1 Barrier类原型说明
class threading.Barrier(parties, action=None, timeout=None)
Create a barrier object for parties number of threads. An action, when provided, is a callable to be called by one of the threads when they are released. timeout is the default timeout value if none is specified for the wait() method.
This class provides a simple synchronization primitive for use by a fixed number of threads that need to wait for each other.
Each of the threads tries to pass the barrier by calling the wait() method and will block until all of the threads have made their wait() calls. At this point, the threads are released simultaneously.
Barrier可以认为是Condition的高层封装结构:
- 不可 外传Lock 参数
- 内部数据结构控制流程
4.1.5.2 Barrier重要方法
- parties
The number of threads required to pass the barrier
- n_waiting
The number of threads currently waiting in the barrier.
- broken
A boolean that is True if the barrier is in the broken state.
- _exception _threading.BrokenBarrierError
This exception, a subclass of RuntimeError, is raised when the Barrier object is reset or broken.
- wait(timeout=None)
Pass the barrier. When all the threads party to the barrier have called this function, they are all released simultaneously. If a timeout is provided, it is used in preference to any that was supplied to the class constructor.
The return value is an integer in the range [0, parties-1] , different for each thread. This can be used to select a thread to do some special housekeeping
If an action was provided to the constructor, one of the threads will have called it prior to being released.
If the call times out, the barrier is put into the broken state.
If the barrier is put into the broken state, this call raise an BrokenBarrierError.
- reset()
Return the barrier to the default, empty state. Any threads waiting on it will receive the BrokenBarrierError exception.
Note:
- set Barrier._state to init filling state #0 filling, 1, draining, -1 resetting, -2 broken
- start a new Barrier loop
- abort()
Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the needs to abort, to avoid deadlocking the application.
Note:
- set barrier.broken to broken state, can not work normally
- in broken state, can not start new sub-thread with the same barrier
- any thread called wait and still waiting__ _or _future call wait will raise BrokenBarrierError
- if threads have been woken up after calling wait, abort method will not break the alive sub-threads
4.2 Resource Lock
4.2.1 threading.Lock
4.2.1.1 Lock类原型说明
The class implementing primitive lock objects. Once a thread has acquired a lock, subsequent attempts to acquire it block, until it is released; any thread may release it.
4.2.1.2 Lock重要方法
- acquire(blocking=True, timeout=-1)
The return value is True if the lock is acquired successfully, False if not (for example if the timeout expired).
- release()
Release a lock. This can be called from any thread, not only the thread which has acquired the lock.
There is no return value.
Note: 支持 上下文管理器
4.2.2 threading.RLock
4.2.2.1 RLock类原型说明
A reentrant lock is a synchronization primitive that may be acquired multiple times by the same thread. Internally, it uses the concepts of “owning thread” and “recursion level” in addition to the locked/unlocked state used by primitive locks. In the locked state, some thread(not threads) owns the lock; in the unlocked state, no thread owns it.
To lock the lock, a thread calls its acquire() method; this returns once the thread owns the lock. To unlock the lock, a thread calls its release() method. acquire()/release() call pairs may be nested; only the final release()(the release() of the outermost pair) resets the lock to unlocked and allows another thread blocked in acquire()to proceed.
4.2.2.2 RLock重要方法
- acquire(blocking=True, timeout=-1)
When invoked without arguments: if this thread already owns the lock, increment the recursion level by one, and return immediately. Otherwise, if another thread owns the lock, block until the lock is unlocked.
Once the lock is unlocked (not owned by any thread), then grab ownership, set the recursion level to one, and return. If more than one thread is blocked waiting until the lock is unlocked, only one at a time will be able to grab ownership of the lock. There is no return value in this case.
- release()
Release a lock, decrementing the recursion level. If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread.
Note: 支持 上下文管理器
5. 小结
线程间通信的方式种类繁多,现做以下总结常用实用场景:
- 线程延迟启动: threading.Timer
- 等待多个线程完成后再继续执行:threading.Thread.join
- 线程顺序执行:threading.Thread.join
- 等待某个时机被唤醒:threading.Condition.wait/notify/notify_all, or threading.Event
- 限制资源同时访问数:threading.semaphore
- 多个子线程准备工作完成后,同时启动执行:threading.Barrier
重要概念:
- 同步:在发出一个功能调用时,在没有得到结果之前,该调用就不返回,同时其它线程也不能调用这个方法。
- 协同:多个进程、线程之间协同步调,按预定的先后次序进行运行。
在多线程编程里面,一些敏感数据不允许被多个线程同时访问,此时就使用同步访问技术,保证数据在任何时刻,最多有一个线程访问,以保证数据的完整性。