信号就是在框架核心功能或者一些 Flask 扩展发生动作时发送的通知 , 利用信号可以实现一部分的业务解藕。

在 Flask 中 , 信号功能由 Blinker 库提供 , 如果没有安装该库就无法使用信号功能 , 但是不会影响其他功能 , 因为如果没有该库 , Flask 将提供一个假的信号 , flask.signals.py 中 :

  1. signals_available = False
  2. try:
  3. from blinker import Namespace
  4. signals_available = True
  5. except ImportError:
  6. class Namespace(object):
  7. def signal(self, name, doc=None):
  8. return _FakeSignal(name, doc)
  9. class _FakeSignal(object):
  10. """If blinker is unavailable, create a fake class with the same
  11. interface that allows sending of signals but will fail with an
  12. error on anything else. Instead of doing anything on send, it
  13. will just ignore the arguments and do nothing instead.
  14. """
  15. def __init__(self, name, doc=None):
  16. self.name = name
  17. self.__doc__ = doc
  18. def _fail(self, *args, **kwargs):
  19. raise RuntimeError('signalling support is unavailable '
  20. 'because the blinker library is '
  21. 'not installed.')
  22. send = lambda *a, **kw: None
  23. connect = disconnect = has_receivers_for = receivers_for = \
  24. temporarily_connected_to = connected_to = _fail
  25. del _fail
  26. # The namespace for code signals. If you are not Flask code, do
  27. # not put signals in here. Create your own namespace instead.
  28. _signals = Namespace()

需要安装 blinker

  1. $ pip install blinker

下面是一个 Blinker 的示例:

  1. from blinker import signal
  2. # 创建信号
  3. started = signal('test-started')
  4. def each(round):
  5. print("Round {}!".format(round))
  6. def round_two(round):
  7. print("Only {}".format(round))
  8. # 订阅信号,each为接收者
  9. started.connect(each)
  10. # round_two为接收者,sender为发送者
  11. # 表示只有发送者为2时才接收
  12. started.connect(round_two, sender=2)
  13. for round in range(1,4):
  14. # 发送信号
  15. started.send(round)

Flask 中有一些钩子 , 如 before_request 和 after_request , 这些钩子不需要 Blinker 库并且允许你改变请求对象 (request) 或者响应对象 (response) , 而信号和钩子做的事情很像 , 只不过信号并不对请求对象和响应对象做改变 , 仅承担记录和通知的工作

内置信号

在 flask.signals.py 中我们可以看到 , Flask 内置了 10 个信号 :

  1. # 模板渲染成功时发送
  2. template_rendered = _signals.signal('template-rendered')
  3. # 模板渲染前发送
  4. before_render_template = _signals.signal('before-render-template')
  5. # 建立请求上下文后,在请求处理开始前发送
  6. request_started = _signals.signal('request-started')
  7. # 在响应发送给客户端之前发送
  8. request_finished = _signals.signal('request-finished')
  9. # 请求销毁时发送,无论请求成败都会发送
  10. request_tearing_down = _signals.signal('request-tearing-down')
  11. # 请求处理抛出异常时发送
  12. got_request_exception = _signals.signal('got-request-exception')
  13. # 应用上下文销毁时发送
  14. appcontext_tearing_down = _signals.signal('appcontext-tearing-down')
  15. # 应用上下文进栈中时发送
  16. appcontext_pushed = _signals.signal('appcontext-pushed')
  17. # 应用上下文出栈时发送
  18. appcontext_popped = _signals.signal('appcontext-popped')
  19. # 调用flask在其中添加数据时发送
  20. message_flashed = _signals.signal('message-flashed')

创建信号

我们以 request_started 为例来看看其内部实现 :

  1. from blinker import Namespace
  2. _signals = Namespace()
  3. # 调用Namespace对象的signal方法
  4. # 完成信号对象的创建,并使其成为全局引用
  5. request_started = _signals.signal('request-started')

Namespace.signal() 如下 :

  1. class Namespace(dict):
  2. """A mapping of signal names to signals."""
  3. def signal(self, name, doc=None):
  4. """
  5. 返回NamedSignal对象
  6. """
  7. try:
  8. return self[name]
  9. except KeyError:
  10. # Namespace为内置对象dict的派生类,
  11. # 设置并返回值,
  12. # self.request-started = NameSignal('request-started')
  13. return self.setdefault(name, NamedSignal(name, doc))

订阅信号

如果我们要使用内置信号 , 那么首先我们需要订阅信号 , 也就是使用 Signal.connect() 方法

  1. from flask import Flask, request_started
  2. app = Flask(__name__)
  3. # log_reqeust函数为接收方,app为发送方
  4. # 对于接收函数的参数,第一个位置不可缺省,
  5. # 因为在send调用该函数时,内部传入了一个sender实参
  6. def log_request(sender, **extra):
  7. print('Before the request comes ...')
  8. # 订阅信号
  9. request_started.connect(log_request, app)
  10. @app.route('/index')
  11. def index():
  12. return 'index page'
  13. if __name__ == '__main__':
  14. app.run()

connect() 源码如下 :

  1. def connect(self, receiver, sender=ANY, weak=True):
  2. """
  3. Connect *receiver* to signal events sent by *sender*.
  4. receiver:为一个可调用对象
  5. """
  6. receiver_id = hashable_identity(receiver)
  7. if weak:
  8. # <weakref at 0x000002129F72EE28; to 'function' at 0x000002129CBE7F28 (log_request)>
  9. # receiver将在send时被调用,self._cleanup_receiver
  10. receiver_ref = reference(receiver, self._cleanup_receiver)
  11. receiver_ref.receiver_id = receiver_id
  12. else:
  13. receiver_ref = receiver
  14. if sender is ANY:
  15. sender_id = ANY_ID
  16. else:
  17. sender_id = hashable_identity(sender)
  18. self.receivers.setdefault(receiver_id, receiver_ref)
  19. # self._by_sender与self._by_receiver为两个默认字典,其value默认为set
  20. # {sender_id:{receiver_id,}}
  21. self._by_sender[sender_id].add(receiver_id)
  22. # {receiver_id:{sender_id,}}
  23. self._by_receiver[receiver_id].add(sender_id)
  24. del receiver_ref
  25. # 此时self._weak_senders为空,所以以下不会执行
  26. if sender is not ANY and sender_id not in self._weak_senders:
  27. # wire together a cleanup for weakref-able senders
  28. try:
  29. sender_ref = reference(sender, self._cleanup_sender)
  30. sender_ref.sender_id = sender_id
  31. except TypeError:
  32. pass
  33. else:
  34. self._weak_senders.setdefault(sender_id, sender_ref)
  35. del sender_ref
  36. # 此处条件不成立,也不会执行
  37. if ('receiver_connected' in self.__dict__ and
  38. self.receiver_connected.receivers):
  39. try:
  40. self.receiver_connected.send(self,
  41. receiver=receiver,
  42. sender=sender,
  43. weak=weak)
  44. except:
  45. self.disconnect(receiver, sender)
  46. raise
  47. # receiver_connected为空
  48. if receiver_connected.receivers and self is not receiver_connected:
  49. try:
  50. receiver_connected.send(self,
  51. receiver_arg=receiver,
  52. sender_arg=sender,
  53. weak_arg=weak)
  54. except:
  55. self.disconnect(receiver, sender)
  56. raise
  57. return receiver

发送信号

信号的发送是通过 Signal.send() 来完成的 , 而这一步早已经被定义在 Flask 对象中了 , 如下 :

  1. def full_dispatch_request(self):
  2. self.try_trigger_before_first_request_functions()
  3. try:
  4. # 请求处理前发送信号
  5. request_started.send(self)
  6. rv = self.preprocess_request()
  7. if rv is None:
  8. # 分派请求
  9. rv = self.dispatch_request()
  10. except Exception as e:
  11. rv = self.handle_user_exception(e)
  12. return self.finalize_request(rv)

Signal.send() 如下 :

  1. def send(self, *sender, **kwargs):
  2. # Using '*sender' rather than 'sender=None' allows 'sender' to be
  3. # used as a keyword argument- i.e. it's an invisible name in the
  4. # function signature.
  5. if len(sender) == 0:
  6. sender = None
  7. elif len(sender) > 1:
  8. raise TypeError('send() accepts only one positional argument, '
  9. '%s given' % len(sender))
  10. else:
  11. # 取*sender元组中的第一个元素,即self (app)
  12. sender = sender[0]
  13. if not self.receivers:
  14. return []
  15. else:
  16. # 返回并完成调用
  17. return [(receiver, receiver(sender, **kwargs))
  18. for receiver in self.receivers_for(sender)]

Signal.receivers_for() 如下 :

  1. def receivers_for(self, sender):
  2. # self.receivers在信号订阅时被设置
  3. if self.receivers:
  4. sender_id = hashable_identity(sender)
  5. if sender_id in self._by_sender:
  6. # 按照上面的例子我们使用的sender不是ANY,
  7. # 所以self._by_sender[ANY_ID]为一个空集合,
  8. # {sender_id:{receiver_id,}}
  9. # self._by_sender[sender_id]为本例ids
  10. ids = (self._by_sender[ANY_ID] |
  11. self._by_sender[sender_id])
  12. else:
  13. ids = self._by_sender[ANY_ID].copy()
  14. for receiver_id in ids:
  15. # 根据receiver_id获取weakref对象
  16. receiver = self.receivers.get(receiver_id)
  17. if receiver is None:
  18. continue
  19. if isinstance(receiver, WeakTypes):
  20. # strong为订阅函数,即本例的log_reqeust
  21. # 这里你可能会疑惑,见下
  22. strong = receiver()
  23. if strong is None:
  24. # 释放信号
  25. self._disconnect(receiver_id, ANY_ID)
  26. continue
  27. receiver = strong
  28. # 返回函数对象
  29. yield receiver

在上面这段代码中 , 对于 strong = receiver() 我们知道 , WeakTypes = (ref, BoundMethodWeakref) , 而在这两个类型中 , ref 才是正主 ; 不用想我们也知道 , ref 也就是 ReferenceType 中必然有 call 方法 , 但是该方法仅仅一个 pass 摆在那里 , 而调用的返回值却返回了我们的订阅函数 , 这不正常
于是 , 在 ReferenceType 的上方我找到了说明 , Weak-reference support module
这个类型是一个弱引用类型 , 它是一个特殊的存在 , 当你对弱引用对象进行引用时 , 并不能保持该类对象的活动 , 只有通过调用引用判断 ; 如果该引用还存活着 , 那么将返回其引用对象 , 否则将会进行回调
大致过程如下 :

  1. # 依次调用代码
  2. receiver_ref = reference(receiver, self._cleanup_receiver)
  3. weak = callable_reference(object, callback)
  4. return annotatable_weakref(object, callback)
  5. class annotatable_weakref(ref):

弱引用对象没有属性或方法 , 如下有一个示例 :

  1. import weakref
  2. class Foo:
  3. pass
  4. # 实例化Foo
  5. o = Foo()
  6. # 包装成弱引用对象
  7. r = weakref.ref(o)
  8. # 调用弱引用对象
  9. r_result = r()
  10. print(o is r_result)
  11. """
  12. 执行结果:
  13. True
  14. """

弱引用详见 : weakref

最后 , 对于其它信号的发送相关代码位置 , 我们可以通过导入信息来查看 , 导入信息如下 :

  1. # app.py (5个)
  2. from .signals import appcontext_tearing_down, got_request_exception, \
  3. request_finished, request_started, request_tearing_down
  4. # ctx.py (2个)
  5. from .signals import appcontext_pushed, appcontext_popped
  6. # templating.py (2个)
  7. from .signals import template_rendered, before_render_template
  8. # helpers.py (1个)
  9. from .signals import message_flashed

自定义信号

我们可以直接使用 Blinker 创建信号 , 如下 , 定义一中对于上传大文件的信号 :

  1. from blinker import Namespace
  2. web_signals = Namespace()
  3. large_file_saved = web_signals.signal('large-file-saved')

装饰器方式

在 Signal 对象中还有一个 connect_via() 装饰器订阅信号

  1. def connect_via(self, sender, weak=False):
  2. def decorator(fn):
  3. self.connect(fn, sender, weak)
  4. return fn
  5. return decorator

以 flask.appcontext_tearing_down 为例:

  1. from flask import Flask, appcontext_tearing_down, session
  2. app = Flask(__name__)
  3. @appcontext_tearing_down.connect_via(app)
  4. def close_db_connection(sender, **extra):
  5. print('Database connection closed ...')
  6. @app.route('/index')
  7. def index():
  8. return 'index page'
  9. if __name__ == '__main__':
  10. app.run()

另外在 Flask-Login 插件中还带了 6 种信号 , 可以基于其中的信号做一些额外工作