引言 最近在编写Flask的过程中,部分接口需要token鉴权操作。于是单独编写一个“token(login)的装饰器”来解决相关token问题。

1.代码实现

1.token装饰器

目的token装饰器:进行部分接口的token鉴权操作。

  1. mport functools
  2. from flask import request, make_response, current_app
  3. from utils.optUtils import traceID
  4. from utils.responseUtil import responseUtil
  5. from functools import wraps
  6. from utils.redisUtil import ops_string
  7. def check_login_status(func):
  8. @functools.wraps(func)
  9. def wrapper(*args, **kwargs):
  10. # 如果获取到 session , 即判定为 已登录
  11. print("ckeck")
  12. traceId = traceID()
  13. try:
  14. requestToken = request.headers['token']
  15. print("requestToken:\t", requestToken)
  16. redisOps = ops_string()
  17. requestTokenSearchInRedis = redisOps.get(requestToken)
  18. print("requestTokenSearchInRedis:\t", requestTokenSearchInRedis)
  19. if requestTokenSearchInRedis is None:
  20. msg = {"code": 500, "msg": "请重新登陆[token错误]"}
  21. # msg = {"code": 500, "msg": "请重新登陆", "traceId": traceId}
  22. response = make_response(msg)
  23. # responseUtil(current_app, logLevel='error', codeFile=codeFile, request=request, requestBody=None,
  24. # response=response)
  25. responseUtil(current_app, logLevel='error', codeFile=None, request=request, requestBody=None,
  26. response=response)
  27. return response, 500
  28. # if request.cookies:
  29. # return func(*args, **kwargs)
  30. else:
  31. # 此处是核心
  32. return func()
  33. except:
  34. msg = {"code": 500, "msg": "请重新登陆[缺失token]", "traceId": traceId}
  35. response = make_response(msg)
  36. responseUtil(current_app, logLevel='error', request=request, requestBody=None,
  37. response=response)
  38. return response, 500
  39. return wrapper

2.使用token装饰器

  1. 【更新-计划】
  2. @planApp.post("/update")
  3. @check_login_status # 使用token装饰器
  4. def updatePlan():
  5. traceId = traceID()
  6. requestBody = request.get_json()
  7. print("requestBody:\t", requestBody)
  8. id = requestBody['id']
  9. resID = db.session.query(Plan).filter_by(id=id).first()
  10. print("resID:\t", resID)
  11. if resID is not None:
  12. del requestBody['id'] # 不更新id值
  13. print("处理:\t", requestBody)
  14. Plan.query.filter_by(id=id).update(requestBody)
  15. msg = {"code": 0, "msg": "计划更新成功", "traceId": traceId}
  16. # dictSort(data=msg) # 按照ascii升序排序
  17. response = make_response(msg)
  18. responseUtil(current_app, logLevel='info', request=request,
  19. requestBody=requestBody,
  20. response=response)
  21. return response, 200
  22. else:
  23. print("计划更新失败")
  24. msg = {"code": 502, "msg": "计划更新失败", "traceId": traceId}
  25. response = make_response(msg)
  26. responseUtil(current_app, logLevel='error', request=request,
  27. requestBody=requestBody,
  28. response=response)
  29. return response, 502

3.运行验证

image.png

2.【最简示例】flask装饰器

1.代码实现

  1. def check1(func):
  2. @functools.wraps(func)
  3. def wrapper(*args, **kwargs):
  4. print("ckeck")
  5. if 1 == 1:
  6. # func()
  7. return func()
  8. else:
  9. print("error")
  10. return wrapper
  11. @planApp.get("/check")
  12. @check1
  13. def hello():
  14. print("hello-world")
  15. return "hello check"

2.运行验证

image.png