本地线程
Threading Local
为什么需要 Threading Local
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦:
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每个函数一层一层调用都这么传参数那还得了?用全局变量?也不行,因为每个线程处理不同的 Student
对象,不能共享。
如果用一个全局 dict
存放所有的 Student
对象,然后以 thread
自身作为 key
获得线程对应的 Student
对象如何?
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局变量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函数都可以查找出当前线程的std变量:
std = global_dict[threading.current_thread()]
...
这种方式理论上是可行的,它最大的优点是消除了std
对象在每层函数中的传递问题,但是,每个函数获取std
的代码有点丑。
Threading Local 的使用
Threading Local
应运而生,不用查找 dict
,ThreadLocal
帮你自动做这件事:
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
执行结果:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局变量 local_school
就是一个ThreadLocal
对象,每个 Thread
读写的是自己线程的 student
属性,互不影响,也不用管理锁的问题,ThreadLocal
内部会处理。
可以理解为全局变量 local_school
是一个dict
,key 为线程id,value 是线程局部变量的字典
ThreadLocal
最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
Threading Local 的实现
Werkzeug Local
为什么需要 Werkzeug Local
使用 ThreadLocal
对象虽然可以基于线程存储全局变量,但是在Web应用中可能会存在如下问题:
- 有些应用使用的是
greenlet 协程
,这种情况下无法保证协程之间数据的隔离,因为不同的协程可以在同一个线程当中 - 即使使用的是线程,WSGI应用也无法保证每个http请求使用的都是不同的线程,因为后一个http请求可能使用的是之前的http请求的线程,这样的话存储于
ThreadLocal
中的数据可能是之前残留的数据
为了解决上述问题,Werkzeug开发了自己的local对象,这也是为什么我们需要Werkzeug的local对象
Werkzeug Local 的使用
先举一个简单的示例:
from werkzeug.local import Local, LocalManager
local = Local()
local_manager = LocalManager([local])
def application(environ, start_response):
local.request = request = Request(environ)
...
# make_middleware会确保当request结束时,所有存储于local中的对象的reference被清除
application = local_manager.make_middleware(application)
- 首先
Local
对象需要通过LocalManager
来管理,初次生成LocalManager
对象需要传一个list类型的参数,list中是Local对象,当有新的Local对象时,可以通过local_manager.locals.append()
来添加。而当LocalManager对象清理的时候会将所有存储于locals
中的当前context
的数据都清理掉 - 上例中当
local.request
被赋值之后,其可以在当前context
中作为全局数据使用 - 所谓当前
context(the same context)
意味着是在同一个greenlet(如果有)中,也就肯定是在同一个线程当中
那么Werkzeug的Local对象是如何实现这种在相同的 context
环境下保证数据的全局性和隔离性的呢?
Werkzeug Local 的实现
我们先来看下源代码
# 有greenlet的情况下,get_indent实际获取的是greenlet的id
# 没有greenlet的情况下获取的是thread id
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class Local(object):
__slots__ = ('__storage__', '__ident_func__')
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def __iter__(self):
return iter(self.__storage__.items())
# 当调用Local对象时,返回对应的LocalProxy
def __call__(self, proxy):
"""Create a proxy for a name."""
return LocalProxy(self, proxy)
# Local类中特有的method,用于清空greenlet id或线程id对应的dict数据
def __release_local__(self):
self.__storage__.pop(self.__ident_func__(), None)
def __getattr__(self, name):
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}
def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
- Werkzeug 使用了自定义的
__storage__
保存不同线程下的状态- key使用的就是
get_indent
函数获取的id(当有greenlet时使用greenlet id,没有则使用thread id - value 是一个dict,是 greenlet(或者线程) 对应的 local 存储空间
- key使用的就是
- 通过重新实现
__getattr__
,__setattr__
等魔术方法,我们在greenlet或者线程中使用local对象时,实际会自动获取greenlet id(或者线程id),从而获取到对应的 dict 存储空间,再通过name key就可以获取到真正的存储的对象。这个技巧实际上在编写线程安全或协程安全的代码时是非常有用的,即通过线程id(或协程id)来分别存储数据。 - Werkzeug 使用
get_ident
函数来获取线程/协程标识符 - Werkzeug 提供了释放本地线程
local
数据的release_local
方法,如下>>> loc = Local()
>>> loc.foo = 42
>>> release_local(loc) # release_local实际调用local对象的__release_local__ 方法
>>> hasattr(loc, 'foo')
False
Werkzeug 基于自己实现的 Local
还实现了两种数据结果 :
- LocalStack : 基于
werkzeug.local.Local
实现的栈结果 , 可以将对象推入 , 弹出 , 也可以快速拿到栈顶对象 - LocalProxy : 作用和名字一样 , 最标准的代理模式 , 构造此结构时接收一个可以调用的参数 (一般为函数) , 这个函数执行后就是通过
LocalStack
实例化的栈的栈顶对象 ; 对于 LocalProxy 对象的操作实际上都会转发到这个栈顶对象 (也就是一个thread-local
对象) 上面
Werkzeug LocalStack 的实现
LocalStack
与 Local
对象类似,都是可以基于 Greenlet 协程或者线程进行全局存储的存储空间(实际 LocalStack
是对 Local
进行了二次封装),区别在于其数据结构是栈的形式。示例如下:
>>> ls = LocalStack()
>>> ls.push(42)
>>> ls.top
42
>>> ls.push(23)
>>> ls.top
23
>>> ls.pop()
23
>>> ls.top
42
- 从示例看出
Local
对象存储的时候是类似字典的方式,需要有 key 和 value,而LocalStack
是基于栈的,通过push和pop来存储和弹出数据 - 另外,当我们想释放存储空间的时候,也可以调用release_local()
LocalStack 实现
class LocalStack(object):
def __init__(self):
self._local = Local()
def __release_local__(self):
self._local.__release_local__()
@property
def __ident_func__(self):
return self._local.__ident_func__
@__ident_func__.setter
def __ident_func__(self, value):
object.__setattr__(self._local, "__ident_func__", value)
def __call__(self):
def _lookup():
rv = self.top
if rv is None:
raise RuntimeError("object unbound")
return rv
return LocalProxy(_lookup)
def push(self, obj):
"""Pushes a new item to the stack"""
rv = getattr(self._local, "stack", None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv
def pop(self):
"""Removes the topmost item from the stack, will return the
old value or `None` if the stack was already empty.
"""
stack = getattr(self._local, "stack", None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()
@property
def top(self):
"""The topmost item on the stack. If the stack is empty,
`None` is returned.
"""
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
LocalStack在Flask框架中会频繁的出现,其 Request Context
和 App Context
的实现都是基于 LocalStack
。
Werkzeug LocalProxy
LocalProxy
用于代理 Local
对象和 LocalStack
对象,而所谓代理就是作为中间的代理人来处理所有针对被代理对象的操作,如下图所示:
LocalProxy 的使用
初始化 LocalProxy 有三种方式:
- 通过
Local
或者LocalStack
对象的__call__
方法 ```python from werkzeug.local import Local l = Local()
these are proxies
request = l(‘request’) user = l(‘user’)
from werkzeug.local import LocalStack _response_local = LocalStack()
this is a proxy
response = _response_local()
当我们将对象作为函数调用时,实际调用的是`__call__` 方法,`__call__` 方法会返回一个 LocalProxy 对象
- 通过 `LocalProxy` 类进行初始化
```python
l = Local()
request = LocalProxy(l, 'request')
实际上这段代码跟第一种方式是等价的,但这种方式是最 原始
的方式,我们在 Local
的源代码实现中看到其__call__
方法就是通过这种方式生成 LocalProxy
的
- 使用callable对象作为参数
request = LocalProxy(get_current_request())
通过传递一个函数,我们可以自定义如何返回 Local
或 LocalStack
对象
LocalProxy 实现
下面截取LocalProxy的部分代码,我们来进行解析
# LocalProxy部分代码
@implements_bool
class LocalProxy(object):
__slots__ = ('__local', '__dict__', '__name__', '__wrapped__')
def __init__(self, local, name=None):
object.__setattr__(self, '_LocalProxy__local', local)
object.__setattr__(self, '__name__', name)
if callable(local) and not hasattr(local, '__release_local__'):
# "local" is a callable that is not an instance of Local or
# LocalManager: mark it as a wrapped function.
object.__setattr__(self, '__wrapped__', local)
def _get_current_object(self):
"""Return the current object. This is useful if you want the real
object behind the proxy at a time for performance reasons or because
you want to pass the object into a different context.
"""
# 由于所有Local或LocalStack对象都有__release_local__ 方法
# 所以如果没有该属性就表明self.__local为callable对象
if not hasattr(self.__local, '__release_local__'):
return self.__local()
try:
# 此处self.__local 为 Local 或 LocalStack 对象
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError('no object bound to %s' % self.__name__)
@property
def __dict__(self):
try:
return self._get_current_object().__dict__
except RuntimeError:
raise AttributeError('__dict__')
def __getattr__(self, name):
if name == '__members__':
return dir(self._get_current_object())
return getattr(self._get_current_object(), name)
def __setitem__(self, key, value):
self._get_current_object()[key] = value
def __delitem__(self, key):
del self._get_current_object()[key]
if PY2:
__getslice__ = lambda x, i, j: x._get_current_object()[i:j]
def __setslice__(self, i, j, seq):
self._get_current_object()[i:j] = seq
def __delslice__(self, i, j):
del self._get_current_object()[i:j]
# 截取部分操作符代码
__setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
__delattr__ = lambda x, n: delattr(x._get_current_object(), n)
__str__ = lambda x: str(x._get_current_object())
__lt__ = lambda x, o: x._get_current_object() < o
__le__ = lambda x, o: x._get_current_object() <= o
__eq__ = lambda x, o: x._get_current_object() == o
- 首先在
__init__
方法中传递的local
参数会被赋予属性_LocalProxy__local
,该属性可以通过self.__local
进行访问,关于这一点可以看 StackOverflow的问题回答 - LocalProxy通过
_get_current_object
来获取代理的对象。需要注意的是当初始化参数为callable对象时,则直接调用以返回Local或LocalStack对象,具体看源代码的注释。 - 重载了绝大多数操作符,以便在调用LocalProxy的相应操作时,通过
_get_current_object
方法来获取真正代理的对象,然后再进行相应操作
为什么要使用LocalProxy
可是说了这么多,为什么一定要用proxy,而不能直接调用Local或LocalStack对象呢?这主要是在有多个可供调用的对象的时候会出现问题,如下图:
我们再通过下面的代码也许可以看出一二:
# use Local object directly
from werkzeug.local import LocalStack
user_stack = LocalStack()
user_stack.push({'name': 'Bob'})
user_stack.push({'name': 'John'})
def get_user():
# do something to get User object and return it
return user_stack.pop()
# 直接调用函数获取user对象
user = get_user()
print user['name']
print user['name']
打印结果是:
John
John
再看下使用LocalProxy
# use LocalProxy
from werkzeug.local import LocalStack, LocalProxy
user_stack = LocalStack()
user_stack.push({'name': 'Bob'})
user_stack.push({'name': 'John'})
def get_user():
# do something to get User object and return it
return user_stack.pop()
# 通过LocalProxy使用user对象
user = LocalProxy(get_user)
print user['name']
print user['name']
打印结果是:
John
Bob
怎么样,看出区别了吧,直接使用LocalStack对象,user一旦赋值就无法再动态更新了,而使用 Proxy,每次调用操作符(这里 []操作符
用于获取属性),都会重新获取user,从而实现了动态更新user的效果。见下图:
Flask以及Flask的插件很多时候都需要这种动态更新的效果,因此LocalProxy就会非常有用了。
上下文
本地线程是 Flask 中非常重要的一部分 , 因为在请求处理时 , 为了解决请求对象在每一个视图函数传递 (意味着每个视图函数需要像 Django 那样添加一个 request 参数) 的问题 , Flask 巧妙地使用上下文把某些对象变为全局可访问 (实际上是特定环境的局部对象的代理) , 再配合本地线程 , 这样每个线程看到的上下文对象都是不同的
在计算机中,相对于进程而言,上下文就是进程执行时的环境。具体来说就是各个变量和数据,包括所有的寄存器变量、进程打开的文件、内存信息等。可以理解上下文是环境的一个快照,是一个用来保存状态的对象。在程序中我们所写的函数大都不是单独完整的,在使用一个函数完成自身功能的时候,很可能需要同其他的部分进行交互,需要其他外部环境变量的支持,上下文就是给外部环境的变量赋值,使函数能正确运行。
Flask提供了两种上下文,一种是应用上下文(Application Context),一种是请求上下文(Request Context)。
请求上下文
请求上下文示例
from flask import request
@app.route('/')
def index():
user_agent = request.headers.get('User-Agent')
return '<p>Your browser is %s</p>' % user_agent
其流程是这样的 :
- 用户访问产生请求
- 在发生请求的过程中向
_request_ctx_stack
推入这个请求上下文对象 , 它会变成栈顶 , request 就会成为这个请求上下文 , 也就包含了本次请求相关的信息和数据 - 在视图函数中就可以使用
request.args.get('User-Agent')
获取请求信息
Flask 中四种请求hook
Flask 中有四种请求hook
- 第一次请求处理之前的 hook 函数,通过
before_first_request
定义 - 每个请求处理之前的 hook 函数,通过
before_request
定义 - 每个请求正常处理之后的 hook 函数,通过
after_request
定义 - 不管请求是否异常都要执行的
teardown_request
hook 函数
from flask import Flask, g, request
app = Flask(__name__)
@app.before_request
def before_request():
print 'before request started'
print request.url
@app.before_request
def before_request2():
print 'before request started 2'
print request.url
g.name="SampleApp"
@app.after_request
def after_request(response):
print 'after request finished'
print request.url
response.headers['key'] = 'value'
return response
@app.teardown_request
def teardown_request(exception):
print 'teardown request'
print request.url
@app.route('/')
def index():
return 'Hello, %s!' % g.name
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
访问 http://localhost:5000/
后,会在控制台输出:
before request started
http://localhost:5000/
before request started 2
http://localhost:5000/
after request finished
http://localhost:5000/
teardown request
http://localhost:5000/
请求上下文实现
当请求进入时,从Flask.__call__()
开始,然后会在 wsgi_app()
方法中调用 Flask.request_context()
方法实例化 RequestContext
类作为请求上下文对象,然后调用 RequestContext
实例的 push()
方法来推入请求上下文堆栈。
wsgi_app
源码:
def wsgi_app(self, environ, start_response):
# 实例化请求上下文对象
ctx = self.request_context(environ)
error = None
try:
try:
# 将请求上下文对象压入栈中,在这之前会先将应用上下文压入栈中
ctx.push()
# 返回response对象
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
# 调用BaseResponse的__call__方法
# 交给WSGI服务器处理
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)
RequestContext
类实例化方法主要实例化Request对象、Session对象(此时为 None,会在push() 时创建)以及将Request对象与URL连接
# ctx = self.request_context(environ)
# environ是由WSGIRequestHandler.make_environ()制造而来
class RequestContext(object):
"""
请求上下文中包含了请求相关的所有信息
"""
def __init__(self, app, environ, request=None):
# Flask应用实例
self.app = app
if request is None:
# 实例化Request对象
request = app.request_class(environ)
self.request = request
# 为请求创建一个URL适配器
self.url_adapter = app.create_url_adapter(self.request)
self.flashes = None
self.session = None
# 一个隐式的应用上下文栈
self._implicit_app_ctx_stack = []
# 显示上下文是否被保留
self.preserved = False
# remembers the exception for pop if there is one in case the context
# preservation kicks in.
self._preserved_exc = None
# 请求后执行函数
self._after_request_functions = []
# 将Request对象与URL连接
self.match_request()
RequestContext.push()
并不是仅仅将请求上下文压入了栈中 , 同时它还生成了应用上下文并压入了栈中
事实上在 Web 应用环境中 , 请求上下文和应用上下文是一一对应的 , 请求上下文和应用上下文都是本地线程的
class RequestContext(object):
"""
请求上下文中包含了请求相关的所有信息
"""
...
def push(self):
"""Binds the request context to the current context."""
# 获取栈顶
top = _request_ctx_stack.top
if top is not None and top.preserved:
top.pop(top._preserved_exc)
# Before we push the request context we have to ensure that there
# is an application context.
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
# 生成应用上下文AppContext
app_ctx = self.app.app_context()
# 将应用上下文推入栈中
app_ctx.push()
self._implicit_app_ctx_stack.append(app_ctx)
else:
self._implicit_app_ctx_stack.append(None)
if hasattr(sys, 'exc_clear'):
sys.exc_clear()
# 将请求上下文推入栈中
_request_ctx_stack.push(self)
# 创建session 对象
if self.session is None:
session_interface = self.app.session_interface
self.session = session_interface.open_session(
self.app, self.request
)
if self.session is None:
self.session = session_interface.make_null_session(self.app)
对应 RequestContext.pop()
代码如下,请求上下文和应用上下文同时出栈
class RequestContext(object):
"""
请求上下文中包含了请求相关的所有信息
"""
...
def pop(self, exc=_sentinel):
app_ctx = self._implicit_app_ctx_stack.pop()
try:
clear_request = False
if not self._implicit_app_ctx_stack:
self.preserved = False
self._preserved_exc = None
if exc is _sentinel:
exc = sys.exc_info()[1]
# 执行所有使用teardown_request 钩子注册的函数
self.app.do_teardown_request(exc)
# If this interpreter supports clearing the exception information
# we do that now. This will only go into effect on Python 2.x,
# on 3.x it disappears automatically at the end of the exception
# stack.
if hasattr(sys, "exc_clear"):
sys.exc_clear()
request_close = getattr(self.request, "close", None)
if request_close is not None:
request_close()
clear_request = True
finally:
rv = _request_ctx_stack.pop()
# get rid of circular dependencies at the end of the request
# so that we don't require the GC to be active.
if clear_request:
rv.request.environ["werkzeug.request"] = None
# Get rid of the app as well if necessary.
if app_ctx is not None:
app_ctx.pop(exc)
assert rv is self, "Popped wrong request context. (%r instead of %r)" % (
rv,
self,
)
def auto_pop(self, exc):
# 异常发生时需要保持上下文以便进行相关操作,比如在页面的交互式调试器中执行操作或是测试
if self.request.environ.get("flask._preserve_context") or (
exc is not None and self.app.preserve_context_on_exception
):
self.preserved = True
self._preserved_exc = exc
# 没有异常发生时调用pop()方法移除上下文
else:
self.pop(exc)
既然是上下文对象 , 也就以为着在 RequestContext
中必然定义了 __enter__
与 __exit__
:
def __enter__(self):
# 将RequestContext对象压入栈中并返回
self.push()
return self
def __exit__(self, exc_type, exc_value, tb):
# 关闭上下文环境时从栈中弹出
self.auto_pop(exc_value)
if BROKEN_PYPY_CTXMGR_EXIT and exc_type is not None:
reraise(exc_type, exc_value, tb)
所以我们可以使用 with
来开启上下文环境
from flask import Flask
from flask.globals import _request_ctx_stack
app = Flask(__name__)
# 如果你在请求开始前或者请求结束后查看请求上下文栈中的stack
# 很不幸,请求开始前还没有这一属性
# 请求结束后,这一属性也被销毁,因为请求上下文对象销毁了
with app.test_request_context('/?next=http://example.com/') as rqc:
print(rqc.request)
print(_request_ctx_stack._local.stack)
"""
执行结果:
<Request 'http://localhost/?next=http:%2F%2Fexample.com%2F' [GET]>
[<RequestContext 'http://localhost/?next=http:%2F%2Fexample.com%2F' [GET] of ex1>]
"""
应用上下文
应用上下文会按需自动创建和销毁 , 如在将请求上下文对象压入栈中时 , 如果应用上下文栈中没有 , 则会先创建应用上下文 , 它不会在线程间移动 , 并且也不会在请求间共享。
应用上下文源码
class AppContext(object):
def __init__(self, app):
self.app = app
self.url_adapter = app.create_url_adapter(None)
self.g = app.app_ctx_globals_class()
# 引用计数,以追踪被压入栈的次数
self._refcnt = 0
def push(self):
"""Binds the app context to the current context."""
self._refcnt += 1
if hasattr(sys, "exc_clear"):
sys.exc_clear()
_app_ctx_stack.push(self)
appcontext_pushed.send(self.app)
def pop(self, exc=_sentinel):
"""Pops the app context."""
try:
self._refcnt -= 1
if self._refcnt <= 0:
if exc is _sentinel:
exc = sys.exc_info()[1]
self.app.do_teardown_appcontext(exc)
finally:
rv = _app_ctx_stack.pop()
assert rv is self, "Popped wrong app context. (%r instead of %r)" % (rv, self)
appcontext_popped.send(self.app)
def __enter__(self):
self.push()
return self
def __exit__(self, exc_type, exc_value, tb):
self.pop(exc_value)
if BROKEN_PYPY_CTXMGR_EXIT and exc_type is not None:
reraise(exc_type, exc_value, tb)
总结
- 需要保存请求相关的信息——有了请求上下文。
- 为了更好地分离程序的状态,应用起来更加灵活——有了程序上下文。
- 为了让上下文对象可以在全局动态访问,而不用显式地传入视图函数,同时确保线程安全——有了Local(本地线程)。
- 为了支持多个程序——有了Local Stack(本地堆栈)。
参考资料
flask 源码解析:上下文
Werkzeug(Flask)之Local、LocalStack和LocalProxy
Flask的Context(上下文)学习笔记
ThreadLocal