大家好~我是米洛

我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程,希望大家多多支持。

欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

回顾

上一节我们简单介绍了下APScheduler,这一节我们来编写测试计划相关内容。

设计测试计划表

测试计划,其实也可以叫测试集合,它是我们一组用例的集合。并且有着对应的特征:

  • 定时执行
  • 执行完毕后通知方式
  • 通过率低于多少发送邮件/钉钉等通知
  • 优先级
  • 涵盖多少case
  • case失败重试间隔 等等
    这里可能比较的就是把测试计划+测试集合给耦合到一起了。
    基于上面的思路,我们就可以来设计测试计划表了, 具体字段的含义可以参照注释。
    (数据表的设计不一定完美,后续一般会根据业务需求扩展)
  1. from sqlalchemy import Column, String, TEXT, UniqueConstraint, BOOLEAN, SMALLINT, INT
  2. from sqlalchemy.dialects.mysql import TINYTEXT
  3. from app.models.basic import PityBase
  4. class PityTestPlan(PityBase):
  5. project_id = Column(INT, nullable=False)
  6. # 测试计划执行环境, 可以多选
  7. env = Column(String(64), nullable=False)
  8. # 测试计划名称
  9. name = Column(String(32), nullable=False)
  10. # 测试计划优先级
  11. priority = Column(String(3), nullable=False)
  12. # cron表达式
  13. cron = Column(String(12), nullable=False)
  14. # 用例列表
  15. case_list = Column(TEXT, nullable=False)
  16. # 并行/串行(是否顺序执行)
  17. ordered = Column(BOOLEAN, default=False)
  18. # 通过率低于这个数会自动发通知
  19. pass_rate = Column(SMALLINT, default=80)
  20. # 通知用户,目前只有邮箱,后续用户表可能要完善手机号字段,为了通知
  21. receiver = Column(TEXT)
  22. # 通知方式 0: 邮件 1: 钉钉 2: 企业微信 3: 飞书 支持多选
  23. msg_type = Column(TINYTEXT)
  24. # 单次case失败重试间隔,默认2分钟
  25. retry_minutes = Column(SMALLINT, default=2)
  26. # 测试计划是否正在执行中
  27. state = Column(SMALLINT, default=0, comment="0: 未开始 1: 运行中")
  28. __table_args__ = (
  29. UniqueConstraint('project_id', 'name', 'deleted_at'),
  30. )
  31. __tablename__ = "pity_test_plan"
  32. def __init__(self, project_id, env, case_list, name, priority, cron, ordered, pass_rate, receiver, msg_type,
  33. retry_minutes, user, state=0, id=None):
  34. super().__init__(user, id)
  35. self.env = ",".join(map(str, env))
  36. self.case_list = ",".join(map(str, case_list))
  37. self.name = name
  38. self.project_id = project_id
  39. self.priority = priority
  40. self.ordered = ordered
  41. self.cron = cron
  42. self.pass_rate = pass_rate
  43. self.receiver = ",".join(map(str, receiver))
  44. self.msg_type = ",".join(map(str, msg_type))
  45. self.retry_minutes = retry_minutes
  46. self.state = state

这里值得注意的是,我们定义的FORM数据,环境列表env、接收人receiver、用例列表case_list都是数组,我们需要进行一波转换

编写CRUD方法

  • 抽离异步分页方法和where方法

测试平台系列(73) 设计测试计划功能 - 图1

在DatabaseHelper类中添加pagination方法,接受page和size,session和sql参数,先读出sql匹配到的总数,如果为0则直接return,否则通过offset和limit获取到对应分页的数据。

where方法是用于改进我们平时的多条件查询,类似这种:

测试平台系列(73) 设计测试计划功能 - 图2

  • 编写新增测试计划方法

测试平台系列(73) 设计测试计划功能 - 图3

可以看到改造之后,我们只需要调用where方法,不需要写if name != "":这样的语句了。

因为我们不允许同一个项目里面出现同名的测试计划,所以条件是项目id+name不能重复。

  • 编写增改删方法
  1. from sqlalchemy import select
  2. from app.models import async_session, DatabaseHelper
  3. from app.models.schema.test_plan import PityTestPlanForm
  4. from app.models.test_plan import PityTestPlan
  5. from app.utils.logger import Log
  6. class PityTestPlanDao(object):
  7. log = Log("PityTestPlanDao")
  8. @staticmethod
  9. async def list_test_plan(page: int, size: int, project_id: int = None, name: str = ''):
  10. try:
  11. async with async_session() as session:
  12. conditions = [PityTestPlan.deleted_at == 0]
  13. DatabaseHelper.where(project_id, PityTestPlan.project_id == project_id, conditions) \
  14. .where(name, PityTestPlan.name.like(f"%{name}%"), conditions)
  15. sql = select(PityTestPlan).where(conditions)
  16. result, total = await DatabaseHelper.pagination(page, size, session, sql)
  17. return result, total
  18. except Exception as e:
  19. PityTestPlanDao.log.error(f"获取测试计划失败: {str(e)}")
  20. raise Exception(f"获取测试计划失败: {str(e)}")
  21. @staticmethod
  22. async def insert_test_plan(plan: PityTestPlanForm, user: int):
  23. try:
  24. async with async_session() as session:
  25. async with session.begin():
  26. query = await session.execute(select(PityTestPlan).where(PityTestPlan.project_id == plan.project_id,
  27. PityTestPlan.name == plan.name,
  28. PityTestPlan.deleted_at == 0))
  29. if query.scalars().first() is not None:
  30. raise Exception("测试计划已存在")
  31. plan = PityTestPlan(**plan.dict(), user=user)
  32. await session.add(plan)
  33. except Exception as e:
  34. PityTestPlanDao.log.error(f"新增测试计划失败: {str(e)}")
  35. raise Exception(f"添加失败: {str(e)}")
  36. @staticmethod
  37. async def update_test_plan(plan: PityTestPlanForm, user: int):
  38. try:
  39. async with async_session() as session:
  40. async with session.begin():
  41. query = await session.execute(
  42. select(PityTestPlan).where(PityTestPlan.id == plan.id, PityTestPlan.deleted_at == 0))
  43. data = query.scalars().first()
  44. if data is None:
  45. raise Exception("测试计划不存在")
  46. DatabaseHelper.update_model(data, plan, user)
  47. except Exception as e:
  48. PityTestPlanDao.log.error(f"编辑测试计划失败: {str(e)}")
  49. raise Exception(f"编辑失败: {str(e)}")
  50. @staticmethod
  51. async def delete_test_plan(id: int, user: int):
  52. try:
  53. async with async_session() as session:
  54. async with session.begin():
  55. query = await session.execute(
  56. select(PityTestPlan).where(PityTestPlan.id == plan.id, PityTestPlan.deleted_at == 0))
  57. data = query.scalars().first()
  58. if data is None:
  59. raise Exception("测试计划不存在")
  60. DatabaseHelper.delete_model(data, user)
  61. except Exception as e:
  62. PityTestPlanDao.log.error(f"删除测试计划失败: {str(e)}")
  63. raise Exception(f"删除失败: {str(e)}")
  64. @staticmethod
  65. async def query_test_plan(id: int) -> PityTestPlan:
  66. try:
  67. async with async_session() as session:
  68. sql = select(PityTestPlan).where(PityTestPlan.deleted_at == 0, PityTestPlan.id == id)
  69. data = await session.execute(sql)
  70. return data.scalars().first()
  71. except Exception as e:
  72. PityTestPlanDao.log.error(f"获取测试计划失败: {str(e)}")
  73. raise Exception(f"获取测试计划失败: {str(e)}")

基本思路差不多,老CRUD了!这里就不多说了,对sqlalchemy的async内容不了解的可以去官网看看demo。

这个query方法,是给定时任务查询测试计划数据使用。由于做了软删除,会导致经常忘记带上deleted_at==0的条件

编写相关接口(app/routers/testcase/testplan.py)

  1. from fastapi import Depends
  2. from app.dao.test_case.TestPlan import PityTestPlanDao
  3. from app.handler.fatcory import PityResponse
  4. from app.models.schema.test_plan import PityTestPlanForm
  5. from app.routers import Permission
  6. from app.routers.testcase.testcase import router
  7. from config import Config
  8. @router.get("/plan/list")
  9. async def list_test_plan(page: int, size: int, project_id: int = None, name: str = "", user_info=Depends(Permission())):
  10. try:
  11. data, total = await PityTestPlanDao.list_test_plan(page, size, project_id, name)
  12. return PityResponse.success_with_size(PityResponse.model_to_list(data), total=total)
  13. except Exception as e:
  14. return PityResponse.failed(str(e))
  15. @router.get("/plan/insert")
  16. async def insert_test_plan(form: PityTestPlanForm, user_info=Depends(Permission(Config.MANAGER))):
  17. try:
  18. await PityTestPlanDao.insert_test_plan(form, user_info['id'])
  19. return PityResponse.success()
  20. except Exception as e:
  21. return PityResponse.failed(str(e))
  22. @router.get("/plan/update")
  23. async def update_test_plan(form: PityTestPlanForm, user_info=Depends(Permission(Config.MANAGER))):
  24. try:
  25. await PityTestPlanDao.update_test_plan(form, user_info['id'])
  26. return PityResponse.success()
  27. except Exception as e:
  28. return PityResponse.failed(str(e))
  29. @router.get("/plan/delete")
  30. async def delete_test_plan(id: int, user_info=Depends(Permission(Config.MANAGER))):
  31. try:
  32. await PityTestPlanDao.delete_test_plan(id, user_info['id'])
  33. return PityResponse.success()
  34. except Exception as e:
  35. return PityResponse.failed(str(e))

这边我们把测试计划的增删改权限赋予MANAGER,但其实最好是能给对应的项目经理,不过那样会稍微复杂点。我们暂时先偷个懒,或许再完善

今天的内容就分享到这里,下节会介绍测试计划如何和APScheduler结合起来,之后便是编写测试计划的前端页面部分,完成定时任务功能。