未命名作品.jpg


Flask简介

Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架,开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。
“微”(micro) 并不表示你需要把整个 Web 应用塞进单个 Python 文件(虽然确实可以 ),也不意味着 Flask 在功能上有所欠缺。微框架中的“微”意味着 Flask 旨在保持核心简单而易于扩展。Flask 不会替你做出太多决策——比如使用何种数据库。而那些 Flask 所选择的——比如使用何种模板引擎——则很容易替换。除此之外的一切都由可由你掌握。如此,Flask 可以与您珠联璧合。
默认情况下,Flask 不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask 支持用扩展来给应用添加这些功能,如同是 Flask 本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。Flask 也许是“微小”的,但它已准备好在需求繁杂的生产环境中投入使用

wsgiref

最简单的Web应用就是先把HTML用文件保存好,用一个现成的HTTP服务器软件,接收用户请求,从文件中读取HTML,返回。
如果要动态生成HTML,就需要把上述步骤自己来实现。不过,接受HTTP请求、解析HTTP请求、发送HTTP响应都是苦力活,如果我们自己来写这些底层代码,还没开始写动态HTML呢,就得花个把月去读HTTP规范。
正确的做法是底层代码由专门的服务器软件实现,我们用Python专注于生成HTML文档。因为我们不希望接触到TCP连接、HTTP原始请求和响应格式,所以,需要一个统一的接口协议来实现这样的服务器软件,让我们专心用Python编写Web业务。这个接口就是WSGI:Web Server Gateway Interface。而wsgiref模块就是python基于wsgi协议开发的服务模块

  1. from wsgiref.simple_server import make_server
  2. def mya(environ, start_response):
  3. print(environ)
  4. start_response('200 OK', [('Content-Type', 'text/html')])
  5. if environ.get('PATH_INFO') == '/index':
  6. with open('index.html','rb') as f:
  7. data=f.read()
  8. elif environ.get('PATH_INFO') == '/login':
  9. with open('login.html', 'rb') as f:
  10. data = f.read()
  11. else:
  12. data=b'<h1>Hello, web!</h1>'
  13. return [data]
  14. if __name__ == '__main__':
  15. myserver = make_server('', 8011, mya)
  16. print('监听8010')
  17. myserver.serve_forever()

1.安装

pip3 install flask

2.werkzeug简介

Werkzeug是一个WSGI工具包,他可以作为一个Web框架的底层库。这里稍微说一下, werkzeug 不是一个web服务器,也不是一个web框架,而是一个工具包,官方的介绍说是一个 WSGI 工具包,它可以作为一个 Web 框架的底层库,因为它封装好了很多 Web 框架的东西,例如 Request,Response 等等
代码示例:

  1. from werkzeug.wrappers import Request, Response
  2. @Request.application
  3. def hello(request):
  4. return Response('Hello World!')
  5. if __name__ == '__main__':
  6. from werkzeug.serving import run_simple
  7. run_simple('localhost', 4000, hello)

3.flask快速使用

  1. from flask import Flask
  2. # 实例化产生一个Flask对象
  3. app = Flask(__name__)
  4. # 将 '/'和视图函数hello_workd的对应关系添加到路由中
  5. @app.route('/') # 1. v=app.route('/') 2. v(hello_world)
  6. def hello_world():
  7. return 'Hello World!'
  8. if __name__ == '__main__':
  9. app.run() # 最终调用了run_simple()

案例:登录,显示用户信息

main.py

  1. from flask import Flask,render_template,request,redirect,session,url_for
  2. app = Flask(__name__)
  3. app.debug = True
  4. app.secret_key = 'sdfsdfsdfsdf'
  5. USERS = {
  6. 1:{'name':'张三','age':18,'gender':'男','text':"道路千万条"},
  7. 2:{'name':'李四','age':28,'gender':'男','text':"安全第一条"},
  8. 3:{'name':'王五','age':18,'gender':'女','text':"行车不规范"},
  9. }
  10. @app.route('/detail/<int:nid>',methods=['GET'])
  11. def detail(nid):
  12. user = session.get('user_info')
  13. if not user:
  14. return redirect('/login')
  15. info = USERS.get(nid)
  16. return render_template('detail.html',info=info)
  17. @app.route('/index',methods=['GET'])
  18. def index():
  19. user = session.get('user_info')
  20. if not user:
  21. # return redirect('/login')
  22. url = url_for('l1')
  23. return redirect(url)
  24. return render_template('index.html',user_dict=USERS)
  25. @app.route('/login',methods=['GET','POST'],endpoint='l1')
  26. def login():
  27. if request.method == "GET":
  28. return render_template('login.html')
  29. else:
  30. # request.query_string
  31. user = request.form.get('user')
  32. pwd = request.form.get('pwd')
  33. if user == 'cxw' and pwd == '123':
  34. session['user_info'] = user
  35. return redirect('http://www.baidu.com')
  36. return render_template('login.html',error='用户名或密码错误')
  37. if __name__ == '__main__':
  38. app.run()

detail.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>详细信息 {{info.name}}</h1>
  9. <div>
  10. {{info.text}}
  11. </div>
  12. </body>
  13. </html>

index.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>用户列表</h1>
  9. <table>
  10. {% for k,v in user_dict.items() %}
  11. <tr>
  12. <td>{{k}}</td>
  13. <td>{{v.name}}</td>
  14. <td>{{v['name']}}</td>
  15. <td>{{v.get('name')}}</td>
  16. <td><a href="/detail/{{k}}">查看详细</a></td>
  17. </tr>
  18. {% endfor %}
  19. </table>
  20. </body>
  21. </html>

login.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>用户登录</h1>
  9. <form method="post">
  10. <input type="text" name="user">
  11. <input type="text" name="pwd">
  12. <input type="submit" value="登录">{{error}}
  13. </form>
  14. </body>
  15. </html>

4.配置文件

flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:

  1. {
  2. 'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
  3. 'TESTING': False, 是否开启测试模式
  4. 'PROPAGATE_EXCEPTIONS': None,
  5. 'PRESERVE_CONTEXT_ON_EXCEPTION': None,
  6. 'SECRET_KEY': None,
  7. 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),
  8. 'USE_X_SENDFILE': False,
  9. 'LOGGER_NAME': None,
  10. 'LOGGER_HANDLER_POLICY': 'always',
  11. 'SERVER_NAME': None,
  12. 'APPLICATION_ROOT': None,
  13. 'SESSION_COOKIE_NAME': 'session',
  14. 'SESSION_COOKIE_DOMAIN': None,
  15. 'SESSION_COOKIE_PATH': None,
  16. 'SESSION_COOKIE_HTTPONLY': True,
  17. 'SESSION_COOKIE_SECURE': False,
  18. 'SESSION_REFRESH_EACH_REQUEST': True,
  19. 'MAX_CONTENT_LENGTH': None,
  20. 'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
  21. 'TRAP_BAD_REQUEST_ERRORS': False,
  22. 'TRAP_HTTP_EXCEPTIONS': False,
  23. 'EXPLAIN_TEMPLATE_LOADING': False,
  24. 'PREFERRED_URL_SCHEME': 'http',
  25. 'JSON_AS_ASCII': True,
  26. 'JSON_SORT_KEYS': True,
  27. 'JSONIFY_PRETTYPRINT_REGULAR': True,
  28. 'JSONIFY_MIMETYPE': 'application/json',
  29. 'TEMPLATES_AUTO_RELOAD': None,
  30. }

方式一

  1. app.config['DEBUG'] = True
  2. PS 由于Config对象本质上是字典,所以还可以使用app.config.update(...)

方式二

  1. #通过py文件配置
  2. app.config.from_pyfile("python文件名称")
  3. 如:
  4. settings.py
  5. DEBUG = True
  6. app.config.from_pyfile("settings.py")
  7. #通过环境变量配置
  8. app.config.from_envvar("环境变量名称")
  9. #app.config.from_pyfile(os.environ['YOURAPPLICATION_SETTINGS'])
  10. 环境变量的值为python文件名称名称,内部调用from_pyfile方法
  11. app.config.from_json("json文件名称")
  12. JSON文件名称,必须是json格式,因为内部会执行json.loads
  13. app.config.from_mapping({'DEBUG': True})
  14. 字典格式
  15. app.config.from_object("python类或类的路径")
  16. app.config.from_object('pro_flask.settings.TestingConfig')
  17. settings.py
  18. class Config(object):
  19. DEBUG = False
  20. TESTING = False
  21. DATABASE_URI = 'sqlite://:memory:'
  22. class ProductionConfig(Config):
  23. DATABASE_URI = 'mysql://user@localhost/foo'
  24. class DevelopmentConfig(Config):
  25. DEBUG = True
  26. class TestingConfig(Config):
  27. TESTING = True
  28. PS: sys.path中已经存在路径开始写
  29. PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_configTrue,则就是instance_path目录(Flask对象init方法的参数)

5.路由系统

典型写法

  1. @app.route('/detail/<int:nid>',methods=['GET'],endpoint='detail')

默认转换器

  1. DEFAULT_CONVERTERS = {
  2. 'default': UnicodeConverter,
  3. 'string': UnicodeConverter,
  4. 'any': AnyConverter,
  5. 'path': PathConverter,
  6. 'int': IntegerConverter,
  7. 'float': FloatConverter,
  8. 'uuid': UUIDConverter,
  9. }

路由系统本质

  1. """
  2. 1. decorator = app.route('/',methods=['GET','POST'],endpoint='n1')
  3. def route(self, rule, **options):
  4. # app对象
  5. # rule= /
  6. # options = {methods=['GET','POST'],endpoint='n1'}
  7. def decorator(f):
  8. endpoint = options.pop('endpoint', None)
  9. self.add_url_rule(rule, endpoint, f, **options)
  10. return f
  11. return decorator
  12. 2. @decorator
  13. decorator(index)
  14. """
  15. #同理
  16. def login():
  17. return '登录'
  18. app.add_url_rule('/login', 'n2', login, methods=['GET',"POST"])
  19. #与django路由类似
  20. #django与flask路由:flask路由基于装饰器,本质是基于:add_url_rule
  21. #add_url_rule 源码中,endpoint如果为空,endpoint = _endpoint_from_view_func(view_func),最终取view_func.__name__(函数名)

CBV(源码分析)

  1. def auth(func):
  2. def inner(*args, **kwargs):
  3. print('before')
  4. result = func(*args, **kwargs)
  5. print('after')
  6. return result
  7. return inner
  8. class IndexView(views.View):
  9. methods = ['GET']
  10. decorators = [auth, ]
  11. def dispatch_request(self):
  12. print('Index')
  13. return 'Index!'
  14. app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint
  15. #或者,通常用此方式
  16. class IndexView(views.MethodView):
  17. methods = ['GET']
  18. decorators = [auth, ]
  19. def get(self):
  20. return 'Index.GET'
  21. def post(self):
  22. return 'Index.POST'
  23. app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint

app.add_url_rule参数

  1. @app.routeapp.add_url_rule参数:
  2. rule, URL规则
  3. view_func, 视图函数名称
  4. defaults = None, 默认值, URL中无参数,函数需要参数时,使用defaults = {'k': 'v'}
  5. 为函数提供参数
  6. endpoint = None, 名称,用于反向生成URL,即: url_for('名称')
  7. methods = None, 允许的请求方式,如:["GET", "POST"]
  8. #对URL最后的 / 符号是否严格要求
  9. strict_slashes = None
  10. '''
  11. @app.route('/index', strict_slashes=False)
  12. #访问http://www.xx.com/index/ 或http://www.xx.com/index均可
  13. @app.route('/index', strict_slashes=True)
  14. #仅访问http://www.xx.com/index
  15. '''
  16. #重定向到指定地址
  17. redirect_to = None,
  18. '''
  19. @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
  20. '''
  21. #子域名访问
  22. subdomain = None,
  23. '''
  24. #C:\Windows\System32\drivers\etc\hosts
  25. 127.0.0.1 www.liuqingzheng.com
  26. 127.0.0.1 admin.liuqingzheng.com
  27. 127.0.0.1 buy.liuqingzheng.com
  28. from flask import Flask, views, url_for
  29. app = Flask(import_name=__name__)
  30. app.config['SERVER_NAME'] = 'liuqingzheng.com:5000'
  31. @app.route("/", subdomain="admin")
  32. def static_index():
  33. """Flask supports static subdomains
  34. This is available at static.your-domain.tld"""
  35. return "static.your-domain.tld"
  36. #可以传入任意的字符串,如传入的字符串为aa,显示为 aa.liuqingzheng.com
  37. @app.route("/dynamic", subdomain="<username>")
  38. def username_index(username):
  39. """Dynamic subdomains are also supported
  40. Try going to user1.your-domain.tld/dynamic"""
  41. return username + ".your-domain.tld"
  42. if __name__ == '__main__':
  43. app.run()
  44. 访问:
  45. http://www.liuqingzheng.com:5000/dynamic
  46. http://admin.liuqingzheng.com:5000/dynamic
  47. http://buy.liuqingzheng.com:5000/dynamic
  48. '''

支持正则

  1. #1 写类,继承BaseConverter
  2. #2 注册:app.url_map.converters['regex'] = RegexConverter
  3. # 3 使用:@app.route('/index/<regex("\d+"):nid>') 正则表达式会当作第二个参数传递到类中
  4. from flask import Flask, views, url_for
  5. from werkzeug.routing import BaseConverter
  6. app = Flask(import_name=__name__)
  7. class RegexConverter(BaseConverter):
  8. """
  9. 自定义URL匹配正则表达式
  10. """
  11. def __init__(self, map, regex):
  12. super(RegexConverter, self).__init__(map)
  13. self.regex = regex
  14. def to_python(self, value):
  15. """
  16. 路由匹配时,匹配成功后传递给视图函数中参数的值
  17. """
  18. return int(value)
  19. def to_url(self, value):
  20. """
  21. 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
  22. """
  23. val = super(RegexConverter, self).to_url(value)
  24. return val
  25. # 添加到flask中
  26. app.url_map.converters['regex'] = RegexConverter
  27. @app.route('/index/<regex("\d+"):nid>')
  28. def index(nid):
  29. print(url_for('index', nid='888'))
  30. return 'Index'
  31. if __name__ == '__main__':
  32. app.run()

6.模版

6.1渲染变量

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>用户列表</h1>
  9. <table>
  10. {% for k,v in user_dict.items() %}
  11. <tr>
  12. <td>{{k}}</td>
  13. <td>{{v.name}}</td>
  14. <td>{{v['name']}}</td>
  15. <td>{{v.get('name')}}</td>
  16. <td><a href="/detail/{{k}}">查看详细</a></td>
  17. </tr>
  18. {% endfor %}
  19. </table>
  20. </body>
  21. </html>

6.2变量的循环

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>用户列表</h1>
  9. <table>
  10. {% for k,v in user_dict.items() %}
  11. <tr>
  12. <td>{{k}}</td>
  13. <td>{{v.name}}</td>
  14. <td>{{v['name']}}</td>
  15. <td>{{v.get('name')}}</td>
  16. <td><a href="/detail/{{k}}">查看详细</a></td>
  17. </tr>
  18. {% endfor %}
  19. </table>
  20. </body>
  21. </html>

6.3逻辑判断

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>用户列表</h1>
  9. <table>
  10. {% if name %}
  11. <h1>Hello {{ name }}!</h1>
  12. {% else %}
  13. <h1>Hello World!</h1>
  14. {% endif %}
  15. </table>
  16. </body>
  17. </html>

比django中多可以加括号,执行函数,传参数

  1. from flask import Flask,render_template,Markup,jsonify,make_response
  2. app = Flask(__name__)
  3. def func1(arg):
  4. return Markup("<input type='text' value='%s' />" %(arg,))
  5. @app.route('/')
  6. def index():
  7. return render_template('index.html',ff = func1)
  8. if __name__ == '__main__':
  9. app.run()

index.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. {{ff('六五')}}
  9. {{ff('六五')|safe}}
  10. </body>
  11. </html>

注意:
1.Markup等价django的mark_safe ,
2.extends,include一模一样

7.请求响应

  1. from flask import Flask
  2. from flask import request
  3. from flask import render_template
  4. from flask import redirect
  5. from flask import make_response
  6. app = Flask(__name__)
  7. @app.route('/login.html', methods=['GET', "POST"])
  8. def login():
  9. # 请求相关信息
  10. # request.method 提交的方法
  11. # request.args get请求提及的数据
  12. # request.form post请求提交的数据
  13. # request.values post和get提交的数据总和
  14. # request.cookies 客户端所带的cookie
  15. # request.headers 请求头
  16. # request.path 不带域名,请求路径
  17. # request.full_path 不带域名,带参数的请求路径
  18. # request.script_root
  19. # request.url 带域名带参数的请求路径
  20. # request.base_url 带域名请求路径
  21. # request.url_root 域名
  22. # request.host_url 域名
  23. # request.host 127.0.0.1:500
  24. # request.files
  25. # obj = request.files['the_file_name']
  26. # obj.save('/var/www/uploads/' + secure_filename(f.filename))
  27. # 响应相关信息
  28. # return "字符串"
  29. # return render_template('html模板路径',**{})
  30. # return redirect('/index.html')
  31. #return jsonify({'k1':'v1'})
  32. # response = make_response(render_template('index.html'))
  33. # response是flask.wrappers.Response类型
  34. # response.delete_cookie('key')
  35. # response.set_cookie('key', 'value')
  36. # response.headers['X-Something'] = 'A value'
  37. # return response
  38. return "内容"
  39. if __name__ == '__main__':
  40. app.run()

8.session

  1. cookie:存放在客户端的键值对
  2. session:存放在客户端的键值对
  3. token:存放在客户端,通过算法来校验

在使用session之前必须现在设置一下密钥

  1. app.secret_key="asdas" #值随便

除请求对象之外,还有一个 session 对象。它允许你在不同请求间存储特定用户的信息。它是在 Cookies 的基础上实现的,并且对 Cookies 进行密钥签名要使用会话,你需要设置一个密钥。 (app.session_interface对象)

  1. 设置:session['username'] 'xxx'
  2. #在django中发什么三件事,1,生成一个随机的字符串 2 往数据库存 3 写入cookie返回浏览器
  3. #在flask中他没有数据库,但session是怎样实现的?
  4. # 生成一个密钥写入这个cookie,然后下次请求的时候,通过这个cookie解密,然后赋值给session
  5. #我们通过app.session_interface来查看
  6. 删除:session.pop('username', None)

app.session_interface中save_session的参数(设置cookie的参数)

  1. key,
  2. value='',
  3. max_age=None, 超时时间 cookie需要延续的时间(以秒为单位)如果参数是\ None`` ,这个cookie会延续到浏览器关闭为止
  4. expires=None, 超时时间(IE requires expires, so set it if hasn't been already.)
  5. path='/', Cookie生效的路径,/ 表示根路径,特殊的:根路径的cookie可以被任何url的页面访问,浏览器只会把cookie回传给带有该路径的页面,这样可以避免将cookie传给站点中的其他的应用。
  6. domain=None, Cookie生效的域名 你可用这个参数来构造一个跨站cookie。如, domain=".example.com"所构造的cookie对下面这些站点都是可读的:www.example.com 、 www2.example.com 和an.other.sub.domain.example.com 。如果该参数设置为 None ,cookie只能由设置它的站点读取
  7. secure=False, 浏览器将通过HTTPS来回传cookie
  8. httponly=False 只能http协议传输,无法被JavaScript获取(不是绝对,底层抓包可以获取到也可以被覆盖)

session源码的执行流程

  1. -save_seesion
  2. -响应的时候,把session中的值加密序列化放大到了cookie中,返回到浏览器中
  3. -open_session
  4. -请求来了,从cookie中取出值,反解,生成session对象,以后再视图函数中直接用sessoin就可以了。

9.闪现(message)

  1. -设置:flash('aaa')
  2. -取值:get_flashed_message()
  3. -
  4. -假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息

示例:

  1. from flask import Flask,flash,get_flashed_messages,request,redirect
  2. app = Flask(__name__)
  3. app.secret_key = 'asdfasdf'
  4. @app.route('/index')
  5. def index():
  6. # 从某个地方获取设置过的所有值,并清除。
  7. val = request.args.get('v')
  8. if val == 'oldboy':
  9. return 'Hello World!'
  10. flash('超时错误',category="x1")
  11. return "ssdsdsdfsd"
  12. # return redirect('/error')
  13. @app.route('/error')
  14. def error():
  15. """
  16. 展示错误信息
  17. :return:
  18. 如果get_flashed_messages(with_category=True)
  19. """
  20. data = get_flashed_messages(category_filter=['x1'])
  21. if data:
  22. msg = data[0]
  23. else:
  24. msg = "..."
  25. return "错误信息:%s" %(msg,)
  26. if __name__ == '__main__':
  27. app.run()

10.请求扩展

1 before_request

类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情

  1. #基于它做用户登录认证
  2. @app.before_request
  3. def process_request(*args,**kwargs):
  4. if request.path == '/login':
  5. return None
  6. user = session.get('user_info')
  7. if user:
  8. return None
  9. return redirect('/login')

2 after_request

类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常

  1. @app.after_request
  2. def process_response1(response):
  3. print('process_response1 走了')
  4. return response

3 before_first_request

第一次请求时,跟浏览器无关

  1. @app.before_first_request
  2. def first():
  3. pass

4 teardown_request

每一个请求之后绑定一个函数,即使遇到了异常

  1. @app.teardown_request
  2. def ter(e):
  3. pass

5 errorhandler

路径不存在时404,服务器内部错误500

  1. @app.errorhandler(404)
  2. def error_404(arg):
  3. return "404错误了"

6 template_global

标签

  1. @app.template_global()
  2. def sb(a1, a2):
  3. return a1 + a2
  4. #{{sb(1,2)}}

7 template_filter

过滤器

  1. @app.template_filter()
  2. def db(a1, a2, a3):
  3. return a1 + a2 + a3
  4. #{{ 1|db(2,3)}}

总结:
1 重点掌握before_request和after_request,
2 注意有多个的情况,执行顺序
3 before_request请求拦截后(也就是有return值),response所有都执行

11 中间件(了解)

  1. from flask import Flask
  2. app = Flask(__name__)
  3. @app.route('/')
  4. def index():
  5. return 'Hello World!'
  6. # 模拟中间件
  7. class Md(object):
  8. def __init__(self,old_wsgi_app):
  9. self.old_wsgi_app = old_wsgi_app
  10. def __call__(self, environ, start_response):
  11. print('开始之前')
  12. ret = self.old_wsgi_app(environ, start_response)
  13. print('结束之后')
  14. return ret
  15. if __name__ == '__main__':
  16. #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法
  17. #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
  18. #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
  19. #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
  20. #把原来的wsgi_app替换为自定义的,
  21. app.wsgi_app = Md(app.wsgi_app)
  22. app.run()

请求所有的流程

  1. ctx = self.request_context(environ)
  2. error = None
  3. try:
  4. try:
  5. ctx.push()
  6. #根据路径去执行视图函数,视图类
  7. response = self.full_dispatch_request()
  8. except Exception as e:
  9. error = e
  10. response = self.handle_exception(e)
  11. except: # noqa: B001
  12. error = sys.exc_info()[1]
  13. raise
  14. return response(environ, start_response)
  15. finally:
  16. #不管出不出异常,都会走这里
  17. if self.should_ignore_error(error):
  18. error = None
  19. ctx.auto_pop(error)

12.蓝图

对程序进行目录结构划分

不使用蓝图,自己分文件

目录结构:

  1. -templates
  2. -views
  3. -__init__.py
  4. -user.py
  5. -order.py
  6. -app.py

app.py

  1. from views import app
  2. if __name__ == '__main__':
  3. app.run()

init.py

  1. from flask import Flask,request
  2. app = Flask(__name__)
  3. #不导入这个不行
  4. from . import account
  5. from . import order
  6. from . import user

user.py

  1. from . import app
  2. @app.route('/user')
  3. def user():
  4. return 'user'

order.py

  1. from . import app
  2. @app.route('/order')
  3. def order():
  4. return 'order'

使用蓝图之中小型系统

详见代码:proflask简单应用程序目录示例.zip
目录结构:

  1. -flask_pro
  2. -flask_test
  3. -__init__.py
  4. -static
  5. -templates
  6. -views
  7. -order.py
  8. -user.py
  9. -manage.py

_init.py

  1. from flask import Flask
  2. app=Flask(__name__)
  3. from flask_test.views import user
  4. from flask_test.views import order
  5. app.register_blueprint(user.us)
  6. app.register_blueprint(order.ord)

manage.py

  1. from flask_test import app
  2. if __name__ == '__main__':
  3. app.run(port=8008)

user.py

  1. from flask import Blueprint
  2. us=Blueprint('user',__name__)
  3. @us.route('/login')
  4. def login():
  5. return 'login'

order.py

  1. from flask import Blueprint
  2. ord=Blueprint('order',__name__)
  3. @ord.route('/test')
  4. def test():
  5. return 'order test'

使用蓝图之大型系统

详见代码:proflask大型应用目录示例.zip
总结:
1 xxx = Blueprint(‘account’, name,url_prefix=’/xxx’) :蓝图URL前缀,表示url的前缀,在该蓝图下所有url都加前缀
2 xxx = Blueprint(‘account’, name,url_prefix=’/xxx’,template_folder=’tpls’):给当前蓝图单独使用templates,向上查找,当前找不到,会找总templates
3 蓝图的befort_request,对当前蓝图有效
4 大型项目,可以模拟出类似于django中app的概念

13.请求上下文源码分析

  1. 第一阶段:将ctx(request,session)放到Local对象上
  2. 第二阶段:视图函数导入:request/session
  3. request.method
  4. -LocalProxy对象.method,执行getattr方法,getattr(self._get_current_object(), name)
  5. -self._get_current_object()返回return self.__local(),self.__local(),在LocakProxy实例化的时候,object.__setattr__(self, '_LocalProxy__local', local),此处local就是:partial(_lookup_req_object, 'request')
  6. -def _lookup_req_object(name):
  7. top = _request_ctx_stack.top #_request_ctx_stack 就是LocalStack()对象,top方法把ctx取出来
  8. if top is None:
  9. raise RuntimeError(_request_ctx_err_msg)
  10. return getattr(top, name)#获取ctx中的requestsession对象
  11. 第三阶段:请求处理完毕
  12. - 获取session并保存到cookie
  13. - ctx删除

程序运行,两个LocalStack()对象,一个里面放request和session,另一个放g和current_app

14.g对象

专门用来存储用户信息的g对象,g的全称的为global
g对象在一次请求中的所有的代码的地方,都是可以使用的

g对象和session的区别

  1. session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次

15.flask-session

作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
安装:pip3 install flask-session
使用1:

  1. from flask import Flask,session
  2. from flask_session import RedisSessionInterface
  3. import redis
  4. app = Flask(__name__)
  5. conn=redis.Redis(host='127.0.0.1',port=6379)
  6. #use_signer是否对key签名
  7. app.session_interface=RedisSessionInterface(conn,key_prefix='lqz')
  8. @app.route('/')
  9. def hello_world():
  10. session['name']='lqz'
  11. return 'Hello World!'
  12. if __name__ == '__main__':
  13. app.run()

使用2:

  1. from redis import Redis
  2. from flask.ext.session import Session
  3. app.config['SESSION_TYPE'] = 'redis'
  4. app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
  5. Session(app)

问题:设置cookie时,如何设定关闭浏览器则cookie失效。

  1. response.set_cookie('k','v',exipre=None)#这样设置即可
  2. #在session中设置
  3. app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
  4. #一般不用,我们一般都设置超时时间,多长时间后失效

问题:cookie默认超时时间是多少?如何设置超时时间

  1. #源码expires = self.get_expiration_time(app, session)
  2. 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制

16.数据库连接池

pymsql链接数据库

  1. import pymysql
  2. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
  3. cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  4. # cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
  5. cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'})
  6. obj = cursor.fetchone()
  7. conn.commit()
  8. cursor.close()
  9. conn.close()
  10. print(obj)

数据库连接池版

setting.py

  1. from datetime import timedelta
  2. from redis import Redis
  3. import pymysql
  4. from DBUtils.PooledDB import PooledDB, SharedDBConnection
  5. class Config(object):
  6. DEBUG = True
  7. SECRET_KEY = "umsuldfsdflskjdf"
  8. PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
  9. SESSION_REFRESH_EACH_REQUEST= True
  10. SESSION_TYPE = "redis"
  11. PYMYSQL_POOL = PooledDB(
  12. creator=pymysql, # 使用链接数据库的模块
  13. maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
  14. mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
  15. maxcached=5, # 链接池中最多闲置的链接,0和None不限制
  16. maxshared=3,
  17. # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
  18. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  19. maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
  20. setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
  21. ping=0,
  22. # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
  23. host='127.0.0.1',
  24. port=3306,
  25. user='root',
  26. password='123456',
  27. database='s8day127db',
  28. charset='utf8'
  29. )
  30. class ProductionConfig(Config):
  31. SESSION_REDIS = Redis(host='192.168.0.94', port='6379')
  32. class DevelopmentConfig(Config):
  33. SESSION_REDIS = Redis(host='127.0.0.1', port='6379')
  34. class TestingConfig(Config):
  35. pass

utils/sql.py

  1. import pymysql
  2. from settings import Config
  3. class SQLHelper(object):
  4. @staticmethod
  5. def open(cursor):
  6. POOL = Config.PYMYSQL_POOL
  7. conn = POOL.connection()
  8. cursor = conn.cursor(cursor=cursor)
  9. return conn,cursor
  10. @staticmethod
  11. def close(conn,cursor):
  12. conn.commit()
  13. cursor.close()
  14. conn.close()
  15. @classmethod
  16. def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
  17. conn,cursor = cls.open(cursor)
  18. cursor.execute(sql, args)
  19. obj = cursor.fetchone()
  20. cls.close(conn,cursor)
  21. return obj
  22. @classmethod
  23. def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
  24. conn, cursor = cls.open(cursor)
  25. cursor.execute(sql, args)
  26. obj = cursor.fetchall()
  27. cls.close(conn, cursor)
  28. return obj

使用:

  1. obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data)

17.wtforms

安装:pip3 install wtforms

使用1:

  1. from flask import Flask, render_template, request, redirect
  2. from wtforms import Form
  3. from wtforms.fields import simple
  4. from wtforms import validators
  5. from wtforms import widgets
  6. app = Flask(__name__, template_folder='templates')
  7. app.debug = True
  8. class LoginForm(Form):
  9. # 字段(内部包含正则表达式)
  10. name = simple.StringField(
  11. label='用户名',
  12. validators=[
  13. validators.DataRequired(message='用户名不能为空.'),
  14. validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
  15. ],
  16. widget=widgets.TextInput(), # 页面上显示的插件
  17. render_kw={'class': 'form-control'}
  18. )
  19. # 字段(内部包含正则表达式)
  20. pwd = simple.PasswordField(
  21. label='密码',
  22. validators=[
  23. validators.DataRequired(message='密码不能为空.'),
  24. validators.Length(min=8, message='用户名长度必须大于%(min)d'),
  25. validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
  26. message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
  27. ],
  28. widget=widgets.PasswordInput(),
  29. render_kw={'class': 'form-control'}
  30. )
  31. @app.route('/login', methods=['GET', 'POST'])
  32. def login():
  33. if request.method == 'GET':
  34. form = LoginForm()
  35. return render_template('login.html', form=form)
  36. else:
  37. form = LoginForm(formdata=request.form)
  38. if form.validate():
  39. print('用户提交数据通过格式验证,提交的值为:', form.data)
  40. else:
  41. print(form.errors)
  42. return render_template('login.html', form=form)
  43. if __name__ == '__main__':
  44. app.run()

login.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>登录</h1>
  9. <form method="post">
  10. <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
  11. <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
  12. <input type="submit" value="提交">
  13. </form>
  14. </body>
  15. </html>

使用2:

  1. from flask import Flask, render_template, request, redirect
  2. from wtforms import Form
  3. from wtforms.fields import core
  4. from wtforms.fields import html5
  5. from wtforms.fields import simple
  6. from wtforms import validators
  7. from wtforms import widgets
  8. app = Flask(__name__, template_folder='templates')
  9. app.debug = True
  10. class RegisterForm(Form):
  11. name = simple.StringField(
  12. label='用户名',
  13. validators=[
  14. validators.DataRequired()
  15. ],
  16. widget=widgets.TextInput(),
  17. render_kw={'class': 'form-control'},
  18. default='alex'
  19. )
  20. pwd = simple.PasswordField(
  21. label='密码',
  22. validators=[
  23. validators.DataRequired(message='密码不能为空.')
  24. ],
  25. widget=widgets.PasswordInput(),
  26. render_kw={'class': 'form-control'}
  27. )
  28. pwd_confirm = simple.PasswordField(
  29. label='重复密码',
  30. validators=[
  31. validators.DataRequired(message='重复密码不能为空.'),
  32. validators.EqualTo('pwd', message="两次密码输入不一致")
  33. ],
  34. widget=widgets.PasswordInput(),
  35. render_kw={'class': 'form-control'}
  36. )
  37. email = html5.EmailField(
  38. label='邮箱',
  39. validators=[
  40. validators.DataRequired(message='邮箱不能为空.'),
  41. validators.Email(message='邮箱格式错误')
  42. ],
  43. widget=widgets.TextInput(input_type='email'),
  44. render_kw={'class': 'form-control'}
  45. )
  46. gender = core.RadioField(
  47. label='性别',
  48. choices=(
  49. (1, '男'),
  50. (2, '女'),
  51. ),
  52. coerce=int # “1” “2”
  53. )
  54. city = core.SelectField(
  55. label='城市',
  56. choices=(
  57. ('bj', '北京'),
  58. ('sh', '上海'),
  59. )
  60. )
  61. hobby = core.SelectMultipleField(
  62. label='爱好',
  63. choices=(
  64. (1, '篮球'),
  65. (2, '足球'),
  66. ),
  67. coerce=int
  68. )
  69. favor = core.SelectMultipleField(
  70. label='喜好',
  71. choices=(
  72. (1, '篮球'),
  73. (2, '足球'),
  74. ),
  75. widget=widgets.ListWidget(prefix_label=False),
  76. option_widget=widgets.CheckboxInput(),
  77. coerce=int,
  78. default=[1, 2]
  79. )
  80. def __init__(self, *args, **kwargs):
  81. super(RegisterForm, self).__init__(*args, **kwargs)
  82. self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
  83. def validate_pwd_confirm(self, field):
  84. """
  85. 自定义pwd_confirm字段规则,例:与pwd字段是否一致
  86. :param field:
  87. :return:
  88. """
  89. # 最开始初始化时,self.data中已经有所有的值
  90. if field.data != self.data['pwd']:
  91. # raise validators.ValidationError("密码不一致") # 继续后续验证
  92. raise validators.StopValidation("密码不一致") # 不再继续后续验证
  93. @app.route('/register', methods=['GET', 'POST'])
  94. def register():
  95. if request.method == 'GET':
  96. form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
  97. return render_template('register.html', form=form)
  98. else:
  99. form = RegisterForm(formdata=request.form)
  100. if form.validate():
  101. print('用户提交数据通过格式验证,提交的值为:', form.data)
  102. else:
  103. print(form.errors)
  104. return render_template('register.html', form=form)
  105. if __name__ == '__main__':
  106. app.run()
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. </head>
  7. <body>
  8. <h1>用户注册</h1>
  9. <form method="post" novalidate style="padding:0 50px">
  10. {% for field in form %}
  11. <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
  12. {% endfor %}
  13. <input type="submit" value="提交">
  14. </form>
  15. </body>
  16. </html>

18.信号

Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
安装:pip3 install blinker
内置信号:

  1. request_started = _signals.signal('request-started') # 请求到来前执行
  2. request_finished = _signals.signal('request-finished') # 请求结束后执行
  3. before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
  4. template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
  5. got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
  6. request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
  7. appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
  8. appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
  9. appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
  10. message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发

使用信号:

  1. from flask import Flask,signals,render_template
  2. app = Flask(__name__)
  3. # 往信号中注册函数
  4. def func(*args,**kwargs):
  5. print('触发型号',args,kwargs)
  6. signals.request_started.connect(func)
  7. # 触发信号: signals.request_started.send()
  8. @app.before_first_request
  9. def before_first1(*args,**kwargs):
  10. pass
  11. @app.before_first_request
  12. def before_first2(*args,**kwargs):
  13. pass
  14. @app.before_request
  15. def before_first3(*args,**kwargs):
  16. pass
  17. @app.route('/',methods=['GET',"POST"])
  18. def index():
  19. print('视图')
  20. return render_template('index.html')
  21. if __name__ == '__main__':
  22. app.wsgi_app
  23. app.run()

一个流程中的信号触发点

  1. a. before_first_request
  2. b. 触发 request_started 信号
  3. c. before_request
  4. d. 模板渲染
  5. 渲染前的信号 before_render_template.send(app, template=template, context=context)
  6. rv = template.render(context) # 模板渲染
  7. 渲染后的信号 template_rendered.send(app, template=template, context=context)
  8. e. after_request
  9. f. session.save_session()
  10. g. 触发 request_finished信号
  11. 如果上述过程出错:
  12. 触发错误处理信号 got_request_exception.send(self, exception=e)
  13. h. 触发信号 request_tearing_down

自定义信号:

  1. from flask import Flask, current_app, flash, render_template
  2. from flask.signals import _signals
  3. app = Flask(import_name=__name__)
  4. # 自定义信号
  5. xxxxx = _signals.signal('xxxxx')
  6. def func(sender, *args, **kwargs):
  7. print(sender)
  8. # 自定义信号中注册函数
  9. xxxxx.connect(func)
  10. @app.route("/x")
  11. def index():
  12. # 触发信号
  13. xxxxx.send('123123', k1='v1')
  14. return 'Index'
  15. if __name__ == '__main__':
  16. app.run()

19.多app应用

  1. from werkzeug.wsgi import DispatcherMiddleware
  2. from werkzeug.serving import run_simple
  3. from flask import Flask, current_app
  4. app1 = Flask('app01')
  5. app2 = Flask('app02')
  6. @app1.route('/index')
  7. def index():
  8. return "app01"
  9. @app2.route('/index2')
  10. def index2():
  11. return "app2"
  12. # http://www.oldboyedu.com/index
  13. # http://www.oldboyedu.com/sec/index2
  14. dm = DispatcherMiddleware(app1, {
  15. '/sec': app2,
  16. })
  17. if __name__ == "__main__":
  18. run_simple('localhost', 5000, dm)

20.flask-script

用于实现类似于django中 python3 manage.py runserver …类似的命令
安装:pip3 install flask-script

使用

  1. from flask_script import Manager
  2. app = Flask(__name__)
  3. manager=Manager(app)
  4. ...
  5. if __name__ == '__main__':
  6. manager.run()
  7. #以后在执行,直接:python3 manage.py runserver
  8. #python3 manage.py runserver --help

自定制命令

  1. @manager.command
  2. def custom(arg):
  3. """
  4. 自定义命令
  5. python manage.py custom 123
  6. :param arg:
  7. :return:
  8. """
  9. print(arg)
  10. @manager.option('-n', '--name', dest='name')
  11. #@manager.option('-u', '--url', dest='url')
  12. def cmd(name, url):
  13. """
  14. 自定义命令(-n也可以写成--name)
  15. 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com
  16. 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com
  17. :param name:
  18. :param url:
  19. :return:
  20. """
  21. print(name, url)
  22. #有什么用?
  23. #把excel的数据导入数据库,定制个命令,去执行

21flask-admin

安装

  1. pip3 install flask_admin

简单使用

  1. from flask import Flask
  2. from flask_admin import Admin
  3. app = Flask(__name__)
  4. #将app注册到adminzhong
  5. admin = Admin(app)
  6. if __name__=="mian":
  7. app.run()
  8. #访问
  9. #127.0.0.1:5000/admin端口,会得到一个空白的页面

将表模型注册到admin中

  1. #在将表注册之前应该对app进行配置
  2. SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@127.0.0.1:3307/py9api?charset=utf8mb4"
  3. SQLALCHEMY_POOL_SIZE = 5
  4. SQLALCHEMY_POOL_TIMEOUT = 30
  5. SQLALCHEMY_POOL_RECYCLE = -1
  6. #导入models文件的中的表模型
  7. from flask_admin.contrib.sqla import ModelView
  8. from api.models import Stock,Product,Images,Category,Wxuser,Banner
  9. admin.add_view(ModelView(Stock, db.session))
  10. admin.add_view(ModelView(Product, db.session))
  11. admin.add_view(ModelView(Category, db.session))

如果有个字段是图片指端

  1. #配置上传文件的路径
  2. #导入from flask_admin.contrib.fileadmin import FileAdmin
  3. from flask_admin.contrib.fileadmin import FileAdmin,form
  4. file_path = op.join(op.dirname(__file__), 'static')
  5. admin = Admin(app)
  6. admin.add_view(FileAdmin(file_path, '/static/', name='文件'))
  7. #如果有个字段要是上传文件重写该方法的modleView类,假设imgae_url是文件图片的字段
  8. class ImagesView(ModelView):
  9. form_extra_fields = {
  10. 'image_url': form.ImageUploadField('Image',
  11. base_path=file_path,
  12. relative_path='uploadFile/'
  13. )
  14. }
  15. admin.add_view(ImagesView(Images, db.session))