引言 在最近使用Flask-SQLAlchemy的过程中,遇到了几个新问题及对应的解决方法,如下所示:

  • 【问题1】怎么实现“动态查询”
    • 【方案】请求参数改写为kwargs形式**。
  • 【问题2】怎么实现“动态查询数据的序列化输出”
    • 【方案】借助“ flask_marshmallow”进行实现。

1.动态查询

在Flask-SQLAlchemy中,我们可以将“请求参数改写为kwargs形式”,例如:“resID = db.session.query(Plan).filter_by(**requestBody**).all() ”,具体实现如下所示:

  1. @planApp.post("/select")
  2. @check_login_status # 定义:检查token的装饰器
  3. def selectPlan():
  4. traceId = traceID()
  5. requestBody = request.get_json()
  6. print("requestBody:\t", requestBody)
  7. resID = db.session.query(Plan).filter_by(**requestBody).all() # 实现动态查询的核心代码
  8. print("【原始结果】 resID:\t", resID)
  9. resCount = db.session.query(Plan).filter_by(**requestBody).count()
  10. print("表数据\t", resID)
  11. print("表数据resCount\t", resCount)
  12. if resCount > 0:
  13. # 多个数据结果的组装
  14. plan_schema = PlanSchema() #
  15. plan_data = {"data": []}
  16. for x in range(resCount):
  17. # 进行字典拼接操作
  18. plan_data["data"] = plan_data["data"] + [plan_schema.dump(resID[x])]
  19. # print("表数据res=\t", plan_data)
  20. print("plan_data-type=\t", type(plan_data))
  21. print("最终-表数据res=\t", plan_data)
  22. msg = {"code": 0, "msg": "查询成功", "traceId": traceId, "success": True, "data": plan_data,
  23. "errorMessage": "成功"}
  24. # msg.update(data)
  25. return jsonify(msg), 200
  26. else:
  27. plan_data = {"code": 501, "msg": "查询失败", "traceId": traceId}
  28. return jsonify(plan_data), 500

2.动态查询结果的序列化

1.未进行序列化操作

在Flask-SQLAlchemy的”db.session.qurey()”通常会返回“实体对象(即(理解为)查询结果的id)”,如下图所示

  1. @planApp.post("/select")
  2. @check_login_status # 定义:检查token的装饰器
  3. def selectPlan():
  4. traceId = traceID()
  5. requestBody = request.get_json()
  6. print("requestBody:\t", requestBody)
  7. resID = db.session.query(Plan).filter_by(**requestBody).all() # 实现动态查询的核心代码
  8. print("【原始结果】 resID:\t", resID)
  9. #非核心代码省略

image.png

2.进行序列化操作

1.操作步骤流程

2.【进阶】Flask-SQLAlchemy之动态查询 - 图2

2.具体实现

1.安装flask_marshmallow

  1. pipenv install flask_marshmallow

2.【自定义—工具】ormDB.py

说明:笔者自定义一个工具,并命名为ormDB.py。

  1. from flask_sqlalchemy import SQLAlchemy
  2. from flask_marshmallow import Marshmallow
  3. db = SQLAlchemy()
  4. ma = Marshmallow()

3.【核心】【自定义—数据模型Model】models.py

说明:笔者自定义一个py文件【命名:models.py】,用于定义Flask-SQLAlchemy中的数据模型及数据序列化展示/输出。

  1. from utils.ormDB import db, ma # 导入自定义的[ormDB]ma、db
  2. # 定义数据模型
  3. class Plan(db.Model):
  4. __tablename__ = 'plan' # 设置表名, 表名默认为类名小写
  5. id = db.Column(db.Integer, primary_key=True, autoincrement=True) # 设置主键, 默认自增
  6. planName = db.Column(db.String(20), nullable=False) # 设置字段名 和 不为空
  7. taskNumber = db.Column(db.Integer, default=10, nullable=False) # 设置默认值
  8. # 【数据序列化】数据转dict
  9. #PlanSchema:集成ma.SQLAlchemyAutoSchema
  10. class PlanSchema(ma.SQLAlchemyAutoSchema):
  11. # 子类Meta:设置属性model、load_instance的value
  12. class Meta:
  13. model = Plan
  14. load_instance = True

4.Flask实例化对象—初始化db、ma

说明:笔者的“.Flask实例化对象存在于自定义的runMain.py中”。

  1. import resource.flaskConfig as FlaskConfig # 导入flask的配置文件,并设置别名:FlaskConfig
  2. from flask import Flask, make_response, current_app, request, jsonify
  3. from flask_cors import CORS
  4. from resource.blueprintConfig import bpConfig
  5. # 【start-工具utils】
  6. from utils.logUtils import logUtils # 【导入】日志工具-函数
  7. from utils.optUtils import traceID # 【导入】uuid工具-函数
  8. from utils.responseUtil import responseUtil
  9. from utils.optUtils import getPyFileName,getFunctonName
  10. # 【start-工具utils】
  11. from apiflask import APIFlask
  12. import json
  13. from utils.ormDB import db, ma
  14. app = Flask(__name__) # 实例化Flask对象
  15. # app = APIFlask(__name__)
  16. CORS(app) # 解决跨域
  17. bpConfig(app) # Flask蓝图配置
  18. app.config.update(RESTFUL_JSON=dict(ensure_ascii=False)) # 加载Flask配置
  19. app.config.from_object(FlaskConfig) # 加载flask的配置文件
  20. # 【核心代码】【使用/绑定】日志工具函数
  21. logUtils(app)
  22. db.init_app(app) # [初始化] db对象
  23. ma.init_app(app) # [初始化] ma对象

3.验证

1.被验证的接口

我们选择“已实现Flask-SQLAlchemy动态查询”的接口:“xxx/plan/select”

  1. @planApp.post("/select")
  2. @check_login_status # 定义:检查token的装饰器
  3. def selectPlan():
  4. traceId = traceID()
  5. requestBody = request.get_json()
  6. print("requestBody:\t", requestBody)
  7. resID = db.session.query(Plan).filter_by(**requestBody).all() # 实现动态查询的核心代码
  8. print("【原始结果】 resID:\t", resID)
  9. resCount = db.session.query(Plan).filter_by(**requestBody).count()
  10. print("表数据\t", resID)
  11. print("表数据resCount\t", resCount)
  12. if resCount > 0:
  13. # 多个数据结果的组装
  14. plan_schema = PlanSchema() #
  15. plan_data = {"data": []}
  16. for x in range(resCount):
  17. # 进行字典拼接操作
  18. plan_data["data"] = plan_data["data"] + [plan_schema.dump(resID[x])]
  19. # print("表数据res=\t", plan_data)
  20. print("plan_data-type=\t", type(plan_data))
  21. print("最终-表数据res=\t", plan_data)
  22. msg = {"code": 0, "msg": "查询成功", "traceId": traceId, "success": True, "data": plan_data,
  23. "errorMessage": "成功"}
  24. # msg.update(data)
  25. return jsonify(msg), 200
  26. else:
  27. plan_data = {"code": 501, "msg": "查询失败", "traceId": traceId}
  28. return jsonify(plan_data), 500

2.apifox请求验证

说明:如下图所示,则表示“动态查询结果数据序列化输出/显示—->成功、通过”

image.png

学习资料 《Flask之SQLAlchemy操作》 https://www.csdn.net/tags/Ntzacg5sOTMxNDktYmxvZwO0O0OO0O0O.html